Data Transfer - Field Guide to Hadoop (2015)

Field Guide to Hadoop (2015)

Chapter 6. Data Transfer

Data transfer deals with three important questions:

§ How do you get data into a Hadoop cluster?

§ How do you get data out of a Hadoop cluster?

§ How do you move data from one Hadoop cluster to another Hadoop cluster?

In general, Hadoop is not a transactional engine, where data is loaded in small, discrete, related bits of information like it would be in an airline reservation system. Instead, data is bulk loaded from external sources such a flat files for sensors, bulk loads from sources like http://www.data.govfor U.S. federal government data or log files, or transfers from relational systems.

The Hadoop ecosystem contains a variety of great tools for working with your data. However, it’s rare for your data to start or end in Hadoop. It’s much more common to have a workflow that starts with data from external systems, such as logs from your web servers, and ends with analytics hosted on a business intelligence (BI) system.

Data transfer tools help move data between those systems. More specifically, data transfer tools provide three basic capabilities:

File transfer

Tools like Flume (described here) and DistCp (described here) help move files and flat text, such as long entries, into your Hadoop cluster.

Database transfer

Tools like Sqoop (described next) provide a simple mechanism for moving data between traditional relational databases, such as Oracle or SQL Server, and your Hadoop cluster.

Data triage

Tools like Storm (described here) can be used to quickly evaluate and categorize new data as it arrives onto your Hadoop system.

Sqoop

fgth 06in01

License

Apache License, Version 2.0

Activity

High

Purpose

Transfer data from HDFS to and from relational databases

Official Page

http://sqoop.apache.org

Hadoop Integration

Fully Integrated

It’s likely that some of your data may originate in a relational database management system (RDBMS) that is usually accessed normally by SQL. You could also use your SQL engine to produce flat files to load into HDFS. While dumps may load large datasets more quickly, you may have reason to take data directly from an RDMBS or place the results of your Hadoop processing into an RDBMS. Sqoop (meaning SQL to Hadoop) is designed to transfer data between Hadoop clusters and relational databases. It’s a top-level Apache project developed by Cloudera, now in the public domain. While Sqoop automates much of the process, some SQL knowledge is required to have this work properly. The Sqoop job is then transformed into a MapReduce job that does the work.

You’ll start your import to Hadoop with a database table that is read into Hadoop as a text file or in Avro or SequenceFile format. You can also export an HDFS file into an RDBMS. In this case, the MapReduce job reads a set of text-delimited files in HDFS in parallel and converts them into rows in an RDBMS. There are options to filter rows and columns, alter delimiters, and more.

Tutorial Links

There’s an excellent series of lectures on this topic available on YouTube. Once you’ve watched Apache Sqoop Tutorial Part 1, you can jump to Parts 2, 3, and 4.

Example Code

Our movie review dataset is in a table in a PostgreSQL database, and we want to import it into a text file in Hadoop (it is also possible to move data from Hadoop to an RDBMS, but this is not illustrated here):

myschema=> select * from moviereviews

reviewer | title | score

----------+----------------+-------

Kevin | Dune | 10

Kevin | Casablanca | 5

Bob | Blazing Saddles| 9

Marshall | Dune | 1

sqoop import --connect jdbc:postgresql://<host>/<database> \

--table moviereviews --username JoeUser --P

<lots of lines omitted>

hadoop fs -cat moviereviews/part-m-00000

Kevin,Dune,10

Kevin,Casablanca,5

Bob, Blazing Saddles,9

Marshall,Dune,1

Flume

fgth 06in02

License

Apache License, Version 2.0

Activity

Medium

Purpose

Data collection and aggregation, especially for log data

Official Page

http://flume.apache.org

Hadoop Integration

Fully Integrated

You have identified data that lives in a feeder system that you’ll need in your Hadoop cluster to do some analysis and now need to find a way to move it there. In general, you cannot use FTP or SCP, as these transport data between POSIX-compliant filesystems and HDFS is not POSIX compliant. Some Hadoop distributions, such as the MapR distribution or those that are certified to use the Isilon OneFS, can accommodate this. You could FTP the data to the native filesystem on a Hadoop node and then use HDFS commands like copyFromLocal, but this is tedious and single threaded. Flume to the rescue!

Flume is a reliable distributed system for collecting, aggregating, and moving large amounts of log data from multiple sources into HDFS. It supports complex multihop flows and fan-in and fan-out. Events are staged in a channel on each agent and delivered to the next agent in the chain, finally removed once they reach the next agent or HDFS, the ultimate sink. A Flume process has a configuration file that list the sources, sinks, and channels for the data flow. Typical use cases include loading log data into Hadoop.

Tutorial Links

Dr. Dobb’s Journal published an informative article on Flume. Readers who enjoy a lecture should check out this interesting presentation from 2011.

Example Code

To use Flume, you’ll first build a configuration file that describes the agent: the source, the sink, and the channel. Here the source is netcat, a program that echoes output through TCP, the sink is an HDFS file, and the channel is a memory channel:

# xmpl.conf

# Name the components on this agent

agent1.sources = src1

agent1.sinks = snk1

agent1.channels = chn1

# Describe/configure the source

agent1.sources.src1.type = exec

agent.sources.src1.command = tail -F /var/log/system.log

agent.sources.src1.channels = memory-channel

# Describe the sink

agent1.sinks.snk1.channel = memory-channel

agent1.sinks.snk1.type = hdfs

agent1.sinks.snk1.hdfs.path = hdfs://n1:54310/tmp/system.log/

agent1.sinks.snk1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory

agent1.channels.chn1.type = memory

agent1.channels.chn1.capacity = 1000

agent1.channels.chn1.transactionCapacity = 100

# Bind the source and sink to the channel

agent1.sources.src1.channels = c1

agent1.sinks.snk1.channel = c1

# Then start the agent. As the lines are added to the log file,

# they will be pushed to the memory channel and then to the

# HDFS file_

flume-ng agent --conf conf --conf-file xmpl.conf --name agent1 \

-Dflume.root.logger=INFO,console

DistCp

fgth 05in02

License

Apache License, Version 2.0

Activity

Low

Purpose

Data movement between Hadoop clusters

Official Page

http://hadoop.apache.org/docs/r1.2.1/distcp2.html

Hadoop Integration

Fully Integrated

If you have a Hadoop cluster and worry what would happen if the entire cluster became unusable, you have a disaster recovery (DR) or continuity of operations (COOP) issue. There are several strategies for dealing with this. One solution might be to load all data into both a primary Hadoop cluster and a backup cluster located remotely from the primary cluster. This is frequently called dual ingest. Then you would have to run every job on the primary cluster on the remote cluster to keep the result files in sync. While feasible, this is managerially complex. You might want to consider using a built-in part of Apache Hadoop called DistCp. Short for distributed copy, DistCP is the primary tool for moving data between Hadoop clusters. You may want to use DistCp for other reasons as well, such as moving data from a test or development cluster to a production cluster. Commercial Hadoop distributions have tools to deal with DR and COOP. Some are built on top of DistCp.

Tutorial Links

Likely as a result of the single-minded simplicity of DistCp, there aren’t a whole lot of dedicated tutorials about the technology. Readers who are interested in digging deeper are invited to start with the official project page.

Example Code

Here’s how you would copy a file named source-file in the source system n1 in the source-dir to destination system n2, where the hostnames n1 and n2 are the hostnames of the node on which the NameNode lives for the source and destination, respectively. If you were using this code snippet in a DR situation, the source-dir and dest-dir would be the same, as would be the source-file and dest-file:

$ hadoop distcp hdfs://n1:8020/source-dir/source-file \

hdfs://n2:8020/dest-dir/dest-file

Storm

fgth 06in04

License

Apache License, Version 2.0

Activity

High

Purpose

Streaming Ingest

Official Page

http://storm.apache.org

Hadoop Integration

API Compatible

Many of the technologies in the big data ecosystem, including Hadoop MapReduce, are built with very large tasks in mind. These systems are designed to perform work in batches, bundling groups of smaller tasks into larger tasks and distributing those large tasks.

While batch processing is an effective strategy for performing complex analysis of very large amounts of data in a distributed and fault-tolerant fashion, it’s ill-suited for processing data in real time. This is where a system like Storm comes in. Storm follows a stream processing model rather than a batch processing model. This means it’s designed to quickly perform relatively simple transformations of very large numbers of small records.

In Storm, a workflow is called a “topology,” with inputs called “spouts” and transformations called “bolts.” It’s important to note that Storm topologies are very different from MapReduce jobs, because jobs have a beginning and an end while topologies do not. The intent is that once you define a topology, data will continue to stream in from your spout and be processed through a series of bolts.

Tutorial Links

In addition to the official Storm tutorial, there is an excellent set of starter resources in GitHub in the Storm-Starter project.

Example Code

In this example, we’re going to build a topology that reads comma-delimited reviews from a ReviewSpout and keeps track of the number of times each title is reviewed. Defining a Storm topology can get a little involved, so we’ll just cover the highlights.

The first step of defining a topology is to define our inputs. We do this by associating a spout with our topology. This spout will be responsible for reading data from some source, such as a Twitter or an RSS feed.

Once we have our spout defined, we can start defining bolts. Bolts are responsible for processing our data. In this case, we have two bolts—the first extracts the movie title from a review, and the second counts the number of times an individual title appears:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("review_spout", new ReviewSpout(), 10);

builder.setBolt("extract_title", new TitleBolt(), 8);

builder.setBolt("count", new TitleCount(), 15);

//Build the "conf" object and configure it appropriately

// for your job

...

StormSubmitter.submitTopology("review_counter", conf,

builder.createTopology());

Spouts and bolts can be authored in a variety of languages, and you can even mix languages in an individual topology. For example, we authored our topology in Java, but we’re going to write one of our bolts in Python. This bolt extracts the film title from a review by splitting the review on commas and retrieving the second field:

import storm

class TitleBolt(storm.BasicBolt):

def process(self, tuple):

words = tuple.values[0].split(",")

storm.emit([words[1]])

TitleBolt().run()