Ingest and Organize Data Lake - HDInsight Essentials, Second Edition (2015)

HDInsight Essentials, Second Edition (2015)

Chapter 5. Ingest and Organize Data Lake

In this chapter, we will look at how to ingest and organize data to the newly created Data Lake to make it effective and useful. The topics covered in this chapter are as follows:

· End-to-end Data Lake solution

· Ingest data using HDFS commands

· Ingest data to Azure Blob using Azure PowerShell

· Ingest data using CloudXplorer

· Using Sqoop to move data from RDBMS to cluster

· Organizing your data in HDFS

· Managing metadata using HCatalog

End-to-end Data Lake solution

In the next few chapters, we will build an end-to-end Data Lake solution using HDInsight. As discussed in Chapter 2, Enterprise Data Lake using HDInsight, the three key components required for a Data Lake are:

· Ingest and organize

· Transform

· Access, analyze, and report

To understand these concepts, we will use real flight on-time performance data from the RITA website with the URL http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236.

In this chapter, we will focus on ingest and organize components:

End-to-end Data Lake solution

Ingesting to Data Lake using HDFS command

The simplest way to upload files is to use Hadoop command line. The following are the steps to load data into Data Lake.

Connecting to a Hadoop client

You can connect to the Hadoop cluster via a remote desktop connection to the active head node. After establishing the remote connection, launch the Hadoop command-line application that can be found as a shortcut on the desktop.

Getting your files on the local storage

Get your files on the edge node, either via web download, SCP or SFTP. The following figure shows you the steps to download the on-time performance data from the website. The steps are selecting the year, month, and other similar fields and then clicking onDownload. The fields that we need for the project are listed as follows:

YEAR, QUARTER, MONTH, DAY_OF_MONTH, DAY_OF_WEEK, FL_DATE, UNIQUE_CARRIER, AIRLINE_ID, FL_NUM, ORIGIN_AIRPORT_ID, ORIGIN_AIRPORT_SEQ_ID, ORIGIN_CITY_MARKET_ID, ORIGIN, ORIGIN_STATE_ABR, DEST_AIRPORT_ID, DEST_AIRPORT_SEQ_ID, DEST_CITY_MARKET_ID, DEST, DEST_STATE_ABR, DEP_TIME,DEP_DELAY, ARR_TIME, ARR_DELAY, CANCELLED, CANCELLATION_CODE, DIVERTED, AIR_TIME, DISTANCE, CARRIER_DELAY, WEATHER_DELAY, NAS_DELAY, SECURITY_DELAY, and LATE_AIRCRAFT_DELAY.

Additionally, we will download the lookup files.

Getting your files on the local storage

This website downloads the files to your local directory. Rename the files to the following format T_ONTIME_YYYYMMDD.csv. The following table shows you the files required for the mini project:

File name/s

Description

/otp/stage/rawotp/T_ONTIME_201301.csv

/otp/stage/rawotp/T_ONTIME_201406.csv

These files are the detailed on-time performance flight data for every flight organized by one file per year and month.

/otp/stage/lookup/airportcode.csv

This is a lookup file that has a full name for each airport abbreviation.

/otp/stage/lookup/cancellationcode.csv

This file has a full description of the cancellation abbreviation.

/otp/stage/lookup/dayofweek.csv

This file has a full description of the day of week.

Transferring to HDFS

Launch the Hadoop command line and use the following commands to upload the complete directory to HDFS:

C:\>hadoop fs -mkdir -p /otp/stage/rawotp

c:\> hadoop fs -put D:\Users\hdinduser2\Downloads\OTPData\stage\rawotp\ /otp/stage/rawotp/

C:\>hadoop fs -mkdir -p /otp/stage/lookup

c:\>hadoop fs -put D:\Users\hdinduser2\Downloads\OTPData\stage\lookup\ /otp/stage/lookup/

Loading data to Azure Blob storage using Azure PowerShell

Azure HDInsight provides 100 percent HDFS functionality using Azure Blob storage under the covers. So, to load data to the cluster, we can load data straight to Azure Blob storage without the need of the HDInsight cluster, thereby, making this more cost effective. In this section, we will see how to load data to Azure Blob using Azure PowerShell.

To load the data to Azure Blob storage from a local filesystem, we will perform the following steps:

1. Open Azure PowerShell using the shortcut on your desktop and type in the Get-AzurePublishSettingsFile command. This will open the manage.windowsazure.com web page and download a .publishsettings file.

2. Next, type in the Import-AzurePublishSettingsFile command with the publishsettings file to import the management certificate.

3. If you have multiple Azure subscriptions, you will need to use the Set-AzureSubscription command to the context of PowerShell:

4. Set-AzureSubscription -SubscriptionName "Pay-As-You-Go"

5. Next, set the storage account name, container name, get the storage key, and set the storage context:

6. $storageAccountName = "hdindstorage"

7. $containerName = "hdind"

8. $storageAccountKey = Get-AzureStorageKey $storageAccountName | %{$_.Primary}

9. $storageContext = New-AzureStorageContext -StorageAccountName $storageAccountName -StorageAccountKey $storageAccountKey

Note

Make sure that the container already exists; otherwise you will get an error.

10. Next, set the variables for the local filename and Blob location in Azure:

11.$fileName = "Z:\HDInsightver2\OTPData\otp\stage\rawotp\T_ONTIME_201201.csv"

12.$blobName = "otp/stage/rawotp/T_ONTIME_201201.csv"

13. Finally, upload the file using the Set-AzureStorageBlobContent command, as follows:

14.Set-AzureStorageBlobContent -File $fileName -Container $containerName -Blob $blobName -context $storageContext

With the preceding steps, we have successfully loaded the local file Z:\HDInsightver2\OTPData\otp\stage\rawotp\T_ONTIME_201201.csv to the Azure Blob otp/stage/rawotp/T_ONTIME_201201.csv in the hdind container.

The following screenshot shows you the results of the preceding commands:

Loading data to Azure Blob storage using Azure PowerShell

Note

The preceding script can be tailored to automate the ingestion of data on a regular basis to the Data Lake.

Loading files to Data Lake using GUI tools

In this section, we will see how to load data to the cluster using graphical user interface tools.

Storage access keys

To use any of the Azure Blob storage explorer tools, you will need your storage access keys. For instructions, you can refer to Chapter 4, Administering Your HDInsight Cluster.

Storage tools

There are several tools that you can use to load data to the Azure Storage; some of them are freeware. I have listed the popular ones here and will show you CloudXplorer in detail:

· CloudXplorer: http://clumsyleaf.com/products/cloudxplorer

· Azure Storage Explorer: http://azurestorageexplorer.codeplex.com/

· Azure Management Studio: http://www.cerebrata.com/products/azure-management-studio/introduction

· CloudBerry Explorer: http://www.cloudberrylab.com/microsoft-azure-explorer-pro.aspx

CloudXplorer

CloudXplorer is a simple GUI tool that allows you to browse, upload, and delete blobs in Azure Storage. It has a simple Windows Explorer-like interface and supports multiple Windows Azure storage accounts.

Key benefits

The following are the key benefits of using a tool such as CloudXplorer:

· CloudXplorer user interface is very intuitive and easy to use Windows File Explorer

· It supports upload of multiple files and directories with the option to run parallel threads

· It supports restart of failed uploads

· It supports copy and move operations between folders, containers, and even accounts

Let's explore this in detail. First, download and install software from http://clumsyleaf.com/products/cloudxplorer.

Registering your storage account

After installation, you will need to register the Azure storage account used for HDInsight with CloudXplorer. Perform the following steps:

1. Click on Manage Accounts.

2. Select Azure Blobs account.

3. Enter a storage name and the storage key, as obtained from the Azure portal.

The following screenshot shows you the preceding steps:

Registering your storage account

Uploading files to your Blob storage

Next, perform the following steps to upload local files as Azure blobs:

1. Browse to the desired storage container and directory.(You can also add new directories by clicking on the New icon next to Upload.)

2. Click on the Upload icon on the menu bar.

3. Select the file/s you want to be uploaded and click on Open.

4. This will start the upload to the Azure storage in the cloud.

5. Optionally, you can change the preferences to run multiple upload threads in parallel.

6. Once the upload is complete, you can verify the files by viewing the list in the directory that you uploaded.

The following screenshot shows you the preceding steps:

Uploading files to your Blob storage

With the preceding steps, we have uploaded the six .csv files to Azure Blob storage using CloudXplorer.

Using Sqoop to move data from RDBMS to Data Lake

Sqoop enables us to transfer data between any relational database and Hadoop. You can import data from any relational database that has a JDBC adaptor such as SQL Server, MySQL, Oracle, Teradata, and others, to HDInsight.

Key benefits

The major benefits of using Sqoop to move data are as follows:

· Leverages RDBMS metadata to get the column data types

· It is simple to script and uses SQL

· It can be used to handle change data capture by importing daily transactional data to HDInsight

· It uses MapReduce for export and import that enables parallel and efficient data movement

Two modes of using Sqoop

Sqoop can be used to get data into and out of Hadoop; it has two modes of operation:

· Sqoop import: Data moves from RDBMS to HDInsight

· Sqoop export: Data moves from HDInsight to RDBMS

The following screenshot shows you two modes of using Sqoop:

Two modes of using Sqoop

Using Sqoop to import data (SQL to Hadoop)

The following is the setup for a Sqoop import demonstration:

· Source database: Teradata

· Server: 10.11.12.6

· SQL username: dbc

· SQL Password: dbc123

· Table: airline_otp (has 509,519 rows)

· Target: /user/rajn/TDStage2

The following are the steps to import data using Sqoop:

1. Ensure that you have necessary JDBC drivers. Copy them to the Sqoop lib folder. In my installation, it was C:\Hadoop\sqoop-1.4.2\lib. I copied terajdbc4.jar and tdgssconfig.jar.

Note

A prerequisite for Sqoop to work is that connectivity to the SQL database should be available that requires proper ports and firewall settings so that the HDInsight cluster can communicate with the database server.

2. Use Windows PowerShell and run the Sqoop import command:

3. C:\Hadoop\sqoop-1.4.2\bin> .\sqoop import --connect "jdbc:teradata://10.11.12.6" --username dbc --password dbc123 --table "airline_otp" --driver com.teradata.jdbc.TeraDriver --target-dir /user/rajn/TDStage2 -m 1

Note

-m 1 ensures that there is only one map task and hence one file that has a complete dataset.

4. Verify the data in HDFS using a tail command:

5. c:\Hadoop\sqoop-1.4.2\lib>hadoop fs -tail /user/rajn/TDStage2/part-m-00000

6. You can optionally filter the data using a where clause. For the preceding dataset, let's filter for flights where carrier is AA (American Airlines):

7. C:\Hadoop\sqoop-1.4.2\bin> .\sqoop import --connect "jdbc:teradata://10.11.12.6" --username dbc --password dbc --table "airline_otp" --driver com.teradata.jdbc.TeraDriver --target-dir /user/rajn/TDStage3 --where " uniquecarrier='AA' " –m 1

Sqoop can also be used to export data out from Hadoop and send it to RDBMS. For details, refer to the Sqoop User Guide. The link to the user guide is available at http://sqoop.apache.org/docs/1.4.4/SqoopUserGuide.html#_literal_sqoop_export_literal.

Organizing your Data Lake in HDFS

As you load files to your Data Lake, it is important to have this process managed for data consumers in order to find the right data. Organization of data requires planning, coordination, and governance. One proposed model that I have seen used by several clients is to have three main directories:

· Staging: This directory will host all the original source files, as they get ingested to the Data Lake. Each source should have its own directory. For example, let's consider that an organization has two financial databases, findb01 and findb02. A proposed directory structure in Data Lake can be /data/stage/findb01 and /data/stage/findb02.

· Cleansed: The data in staging should go through basic audit and data quality checks to ensure that it meets the organization standards. For example, if sales data is being ingested to Data Lake, the state and country code in the sales records should be valid. The cleansed data should be grouped by subject area, for example, finance and sales. A proposed directory structure for the cleansed data is data/cleansed/finance.

· Summarized: Data that is often used by reports should be precomputed for performance and this is called summarized data. For example, a regular report required for an enterprise is quarterly profitability and forecast by country, region, and state, which requires data from sales and finance to be summarized. A proposed directory structure for summarized data is data/summary/findw.

To keep a track of which file is in which directory, it is recommended that you index all this information into a search index such as Elastic search or SOLR. The following is the recommended information for such an index: date of ingestion, filename, source information, destination, file size, and additional attributes that is known about the file. This can be accomplished by a scheduled Oozie workflow that scans the data files in HDFS and builds or updates the SOLR index.

The following figure shows you the data flow and the various directories in a managed Data Lake:

Organizing your Data Lake in HDFS

Managing file metadata using HCatalog

Organizing data in specific directories based on the content and source does provide the foundation for a well-managed Data Lake. In addition to file location, a managed Data Lake should capture key attributes and structure information of the file; for example, for the sales table being ingested to Data Lake in data/stage/salesdb01/sales, the attributes will be as follows:

· Structure of the file: For example, fixed length, delimited, XML, JSON, sequence, and columnar (RC)

· Fields/columns in the data file: For example, fiscal quarter, $amount

· Data types of the fields: For example, integer, string, double, and string

Apache HCatalog provides a table management system for the HDFS based filesystem. It provides the equivalent of information_schema tables of SQL Server. HCatalog will store the format/structure information.

Key benefits

The following are the key benefits of using HCatalog:

· Stores structural metadata of HDFS files in a shared metastore

· Provides interface to metastore for various Hadoop tools such as Pig, Hive, and MapReduce

· Provides table abstraction so that users don't have to worry about where and how data is stored in the Data Lake

· Provides APIs and command-line tools to interact with HCatalog metadata

The following figure shows the architecture of HCatalog and Hadoop ecosystem tools:

Key benefits

Using HCatalog Command Line to create tables

HCatalog Command Line is a convenient way to register metadata with HCatalog. Let's review how this can be done for our OTP datasets that we ingested to Hadoop.

First, we will build an HCatalog script that has four create table commands for the OTP data and the lookups. We will create these tables as external tables, which means that the data files are retained in their original directory and HCatalog only references the location.

Let's review the create table commands that you can save as a otpcreatetable.sql.txt file:

-- Create Database

create database otpdw;

use otpdw;

-- Create stage table for OTP data

CREATE EXTERNAL TABLE airline_otp_stage (

flightyear SMALLINT, -- Year

flightquarter SMALLINT, -- Quarter

flightmonth SMALLINT, -- Month

flightdayofmonth SMALLINT, -- Day of Month

flightdayofweek SMALLINT, -- Day of week

flightdate STRING, -- Flight Date (yyyy-mm-dd)

uniquecarrier STRING, -- Unique Carrier Code.

airlineid INT, -- Identification number carrier

flightnum STRING, -- Flight Number

originairportid INT, -- Origin Airport,

originaiportseqid INT, -- Airport Seq ID

origincitymarketid INT, -- CityMarketID

originairportabr STRING, -- Origin airport ABR

origincityname STRING, -- Origin city name

originstateabr STRING, -- Origin State ABR

destairportid INT, -- Origin Airport,

destaiportseqid INT, -- Airport Seq ID

destcitymarketid INT, -- CityMarketID

destairportabr STRING, -- Origin airport ABR

destcityname STRING, -- Origin city name

deststateabr STRING, -- Origin State ABR

deptime STRING, -- Actual Departure Time (hhmm)

depdelay STRING, -- Difference in minutes

arrtime STRING, -- Actual Arrival Time (hhmm)

arrdelay STRING, -- Difference in minutes

cancelled STRING, -- (1=Yes) -> true/false 0.00

cancelcode STRING, -- Cancelation code null

diverted STRING, -- (1=Yes) -> true/false

airtime DOUBLE, -- Airtime in minutes

distance DOUBLE, -- Distance in miles

carrierdelay STRING, -- delay due to carrier

weatherdelay STRING, -- delay due to weather

nasdelay STRING, -- delay due to nas

securitydelay STRING, -- delay due to security

lateaircraftdelay STRING -- delay due to late aircraft

)

row format DELIMITED

fields terminated by ','

LOCATION '/otp/stage/rawotp';

-- airport code

CREATE EXTERNAL TABLE airport_code_stage (

aiportcode STRING, -- Origin airport abr "DFW"

airportdescription STRING -- description of airport

)

row format DELIMITED

fields terminated by ','

LOCATION '/otp/stage/lookup/airportcode';

-- cancellation code

CREATE EXTERNAL TABLE cancellation_code_stage (

cancellationcode STRING, -- Cancel code like A

cancellationdescription STRING -- description like Carrier

)

row format DELIMITED

fields terminated by ','

LOCATION '/otp/stage/lookup/cancellationcode';

-- Dayofweek code

CREATE EXTERNAL TABLE dayofweek_code_stage (

dayofweekcode STRING, -- Cancel code like A

dayofweekcodedescription STRING -- description like Carrier

)

row format DELIMITED

fields terminated by ','

LOCATION '/otp/stage/lookup/dayofweekcode';

Next, we will call the script via the HCatalog command-line interface. Let's perform the following steps to do so:

1. Log in to the head node of the HDInsight cluster using the remote desktop.

2. Launch the Hadoop Command Line executable.

3. Perform the following commands that will call HCatalog and run the create table commands:

4. c:\> cd C:\apps\dist\hive-0.13.0.2.1.5.0-2057\hcatalog\bin

5.

6. C:\apps\dist\hive-0.13.0.2.1.5.0-2057\hcatalog\bin>hcat.py -f D:\Users\hdinduser2\Downloads\otpcreatetable.sql

After the preceding command is run, the four tables are registered and visible in HCatalog and Hive.

Summary

An Enterprise Data Lake journey starts first with getting valuable data into the lake. There are several mechanisms to ingest data into a Data Lake powered by HDInsight primarily: HDFS transfer, Azure PowerShell, Azure tools with a user interface, and Sqoop. In order to make a Data Lake easy to consume, it is important to have a managed ingestion process with governance and structure of the various directories.

HCatalog provides a shared metastore that can be used by various tools in Hadoop, namely, Hive, Pig, and MapReduce. This ensures that the structural information is defined once and leveraged by these tools. In the next chapter, we will look into the transformation of the data that we just ingested.