Data serialization-working with text and beyond - Data logistics - Hadoop in Practice, Second Edition (2015)

Hadoop in Practice, Second Edition (2015)

Part 2. Data logistics

If you’ve been thinking about how to work with Hadoop in production settings, you’ll benefit from this part of the book, which covers the first set of hurdles you’ll need to jump. These chapters detail the often-overlooked yet crucial topics that deal with data management in Hadoop.

Chapter 3 looks at ways to work with data stored in different formats, such as XML and JSON, paving the way for a broader examination of data formats such as Avro and Parquet that work best with big data and Hadoop.

Chapter 4 examines some strategies for laying out your data in HDFS, and partitioning and compacting your data. This chapter also covers ways of working with small files, as well as how compression can save you from many storage and computational headaches.

Chapter 5 looks at ways to manage moving large quantities of data into and out of Hadoop. Examples include working with relational data in RDBMSs, structured files, and HBase.

Chapter 3. Data serialization-working with text and beyond

This chapter covers

· Working with text, XML, and JSON

· Understanding SequenceFile, Avro, Protocol Buffers, and Parquet

· Working with custom data formats

MapReduce offers straightforward, well-documented support for working with simple data formats such as log files. But MapReduce has evolved beyond log files to more sophisticated data-serialization formats—such as text, XML, and JSON—to the point where its documentation and built-in support runs dry. The goal of this chapter is to document how you can work with common data-serialization formats, as well as to examine more structured serialization formats and compare their fitness for use with MapReduce.

Imagine that you want to work with the ubiquitous XML and JSON data-serialization formats. These formats work in a straightforward manner in most programming languages, with several tools being available to help you with marshaling, unmarshaling, and validating where applicable. Working with XML and JSON in MapReduce, however, poses two equally important challenges. First, MapReduce requires classes that can support reading and writing a particular data-serialization format; if you’re working with a custom file format, there’s a good chance it doesn’t have such classes to support the serialization format you’re working with. Second, MapReduce’s power lies in its ability to parallelize reading your input data. If your input files are large (think hundreds of megabytes or more), it’s crucial that the classes reading your serialization format be able to split your large files so multiple map tasks can read them in parallel.

We’ll start this chapter by tackling the problem of how to work with serialization formats such as XML and JSON. Then we’ll compare and contrast data-serialization formats that are better suited to working with big data, such as Avro and Parquet. The final hurdle is when you need to work with a file format that’s proprietary, or a less common file format for which no read/write bindings exist in MapReduce. I’ll show you how to write your own classes to read/write your file format.

XML and JSON formats

This chapter assumes you’re familiar with the XML and JSON data formats. Wikipedia provides some good background articles on XML and JSON, if needed. You should also have some experience writing MapReduce programs and should understand the basic concepts of HDFS and MapReduce input and output. Chuck Lam’s book, Hadoop in Action (Manning, 2010), is a good resource on this topic.

Data serialization support in MapReduce is a property of the input and output classes that read and write MapReduce data. Let’s start with an overview of how MapReduce supports data input and output.

3.1. Understanding inputs and outputs in MapReduce

Your data might be XML files sitting behind a number of FTP servers, text log files sitting on a central web server, or Lucene indexes in HDFS.[1] How does MapReduce support reading and writing to these different serialization structures across the various storage mechanisms?

1 Apache Lucene is an information retrieval project that stores data in an inverted index data structure optimized for full-text search. More information is available at http://lucene.apache.org/.

Figure 3.1 shows the high-level data flow through MapReduce and identifies the actors responsible for various parts of the flow. On the input side, you can see that some work (Create split) is performed outside of the map phase, and other work is performed as part of the map phase (Read split). All of the output work is performed in the reduce phase (Write output).

Figure 3.1. High-level input and output actors in MapReduce

Figure 3.2 shows the same flow with a map-only job. In a map-only job, the Map-Reduce framework still uses the OutputFormat and RecordWriter classes to write the outputs directly to the data sink.

Figure 3.2. Input and output actors in MapReduce with no reducers

Let’s walk through the data flow and discuss the responsibilities of the various actors. As we do this, we’ll also look at the relevant code from the built-in TextInputFormat and TextOutputFormat classes to better understand the concepts. The TextInputFormat andTextOutputFormat classes read and write line-oriented text files.

3.1.1. Data input

The two classes that support data input in MapReduce are InputFormat and RecordReader. The InputFormat class is consulted to determine how the input data should be partitioned for the map tasks, and the RecordReader performs the reading of data from the inputs.

InputFormat

Every job in MapReduce must define its inputs according to contracts specified in the InputFormat abstract class. InputFormat implementers must fulfill three contracts: they describe type information for map input keys and values, they specify how the input data should be partitioned, and they indicate the RecordReader instance that should read the data from source. Figure 3.3 shows the InputFormat class and how these three contracts are defined.

Figure 3.3. The annotated InputFormat class and its three contracts

Arguably, the most crucial contract is that of determining how to divide the input data. In MapReduce nomenclature, these divisions are referred to as input splits. The input splits directly impact the map parallelism, because each split is processed by a single map task. Working with anInputFormat that’s unable to create multiple input splits over a single data source (such as a file) will result in a slow map phase, because the file will be processed sequentially.

The TextInputFormat class (view source at http://mng.bz/h728) provides an implementation of the InputFormat class’s createRecordReader method, but it delegates the calculation of input splits to its parent class, FileInputFormat. The following code shows the relevant parts of the TextInputFormat class:

The code in FileInputFormat (source at http://mng.bz/CZB8) that determines the input splits is a little more complicated. A simplified form of the code is shown in the following example to portray the main elements of the getSplits method:

The following code shows how you can specify the InputFormat to use for a MapReduce job:

job.setInputFormatClass(TextInputFormat.class);

RecordReader

You’ll create and use the RecordReader class in the map tasks to read data from an input split and to provide each record in the form of a key/value pair for use by mappers. A task is commonly created for each input split, and each task has a single RecordReader that’s responsible for reading the data for that input split. Figure 3.4 shows the abstract methods you must implement.

Figure 3.4. The annotated RecordReader class and its abstract methods

As shown previously, the TextInputFormat class creates a LineRecordReader to read records from the input splits. The LineRecordReader directly extends the RecordReader class and uses the LineReader class to read lines from the input split. The LineRecordReader uses the byte offset in the file for the map key, and the contents of the line for the map value. The following example shows a simplified version of the LineRecordReader (source at http://mng.bz/mYO7):

Because the LineReader class is easy, we’ll skip that code. The next step is to look at how MapReduce supports data outputs.

3.1.2. Data output

MapReduce uses similar processes for supporting both output and input data. Two classes must exist: an OutputFormat and a RecordWriter. The OutputFormat performs some basic validation of the data sink properties, and the RecordWriter writes each reducer output to the data sink.

OutputFormat

Much like the InputFormat class, the OutputFormat class, as shown in figure 3.5, defines the contracts that implementers must fulfill: checking the information related to the job output, providing a RecordWriter, and specifying an output committer, which allows writes to be staged and then made “permanent” upon task or job success. (Output committing is covered in section 3.5.2.)

Figure 3.5. The annotated OutputFormat class

Just like the TextInputFormat, the TextOutputFormat also extends a base class, FileOutputFormat, which takes care of some complicated logistics such as output committing, which we’ll cover later in this chapter. For now, let’s take a look at the work that TextOutputFormatperforms (source at http://mng.bz/lnR0):

The following code shows how you can specify the OutputFormat that should be used for a MapReduce job:

job.setOutputFormatClass(TextOutputFormat.class);

RecordWriter

You’ll use the RecordWriter to write the reducer outputs to the destination data sink. It’s a simple class, as figure 3.6 illustrates.

Figure 3.6. The annotated RecordWriter class overview

The TextOutputFormat returns a LineRecordWriter object, which is an inner class of Text-OutputFormat, to perform the writing to the file. A simplified version of that class (source at http://mng.bz/lnR0) is shown in the following example:

Whereas on the map side it’s the InputFormat that determines how many map tasks are executed, on the reducer side the number of tasks is solely based on the value for mapred.reduce.tasks set by the client (or if it isn’t set, the value is picked up from mapred-site.xml, or from mapred-default.xml if it doesn’t exist in the site file).

Now that you know what’s involved in working with input and output data in Map-Reduce, it’s time to apply that knowledge to solving some common data-serialization problems. Your first step in this journey is to learn how to work with common file formats such as XML.

3.2. Processing common serialization formats

XML and JSON are industry-standard data interchange formats. Their ubiquity in the technology industry is evidenced by their heavy adoption in data storage and exchange. In this section we’ll look at how you can read and write these data formats in MapReduce.

3.2.1. XML

XML has existed since 1998 as a mechanism to represent data that’s readable by machine and human alike. It became a universal language for data exchange between systems and is employed by many standards today, such as SOAP and RSS, and it’s used as an open data format for products such as Microsoft Office.

Technique 8 MapReduce and XML

MapReduce comes bundled with an InputFormat that works with text, but it doesn’t come with one that supports XML. Working on a single XML file in parallel in MapReduce is tricky because XML doesn’t contain a synchronization marker in its data format.[2]

2 A synchronization marker is typically some binary data used to demarcate record boundaries. It allows a reader to perform a random seek into a file and determine where the next record starts by reading until a synchronization marker is found.

Problem

You want to work with large XML files in MapReduce and be able to split and process them in parallel.

Solution

Mahout’s XMLInputFormat can be used to work with XML files in HDFS with MapReduce. It reads records that are delimited by specific XML begin and end tags. This technique also explains how XML can be emitted as output in MapReduce output.

Discussion

MapReduce doesn’t contain built-in support for XML, so we’ll turn to another Apache project—Mahout, a machine learning system—to provide an XML InputFormat. To showcase the XML InputFormat, you can write a MapReduce job that uses Mahout’s XML input format to read property names and values from Hadoop’s configuration files. The first step is to set up the job configuration:

Mahout’s XML input format is rudimentary; you need to tell it the exact start and end XML tags that will be searched for in the file, and files are split (and records extracted) using the following approach:

1. Files are split into discrete sections along HDFS block boundaries for data locality.

2. Each map task operates on a specific input split. The map task seeks to the start of the input split, and then continues to process the file until it hits the first xmlinput.start.

3. The content between xmlinput.start and xmlinput.end is repeatedly emitted until the end of the input split is reached.

Next you need to write a mapper to consume Mahout’s XML input format. The XML element in Text form has been supplied, so you’ll need to use an XML parser to extract content from the XML.[3]

3 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/xml/XMLMapReduceReader.java.

Listing 3.1. Extracting content with Java’s STAX parser

public static class Map extends Mapper<LongWritable, Text,

Text, Text> {

@Override

protected void map(LongWritable key, Text value,

Mapper.Context context)

throws

IOException, InterruptedException {

String document = value.toString();

System.out.println("'" + document + "'");

try {

XMLStreamReader reader =

XMLInputFactory.newInstance().createXMLStreamReader(new

ByteArrayInputStream(document.getBytes()));

String propertyName = ";

String propertyValue = ";

String currentElement = ";

while (reader.hasNext()) {

int code = reader.next();

switch (code) {

case START_ELEMENT:

currentElement = reader.getLocalName();

break;

case CHARACTERS:

if (currentElement.equalsIgnoreCase("name")) {

propertyName += reader.getText();

} else if (currentElement.equalsIgnoreCase("value")) {

propertyValue += reader.getText();

}

break;

}

}

reader.close();

context.write(propertyName.trim(), propertyValue.trim());

} catch (Exception e) {

log.error("Error processing '" + document + "'", e);

}

}

}

The map is given a Text instance, which contains a String representation of the data between the start and end tags. In this code, you use Java’s built-in Streaming API for XML (StAX) parser to extract the key and value for each property and output them.

If you run the MapReduce job against Cloudera’s core-site.xml and use the HDFS cat command to show the output, you’ll see the following:

$ hadoop fs -put $HADOOP_HOME/conf/core-site.xml core-site.xml

$ hip hip.ch3.xml.XMLMapReduceReader \

--input core-site.xml \

--output output

$ hadoop fs -cat output/part*

fs.default.name hdfs://localhost:8020

hadoop.tmp.dir /usr/local/hadoop/tmp

...

This output shows that you’ve successfully worked with XML as an input serialization format with MapReduce. Not only that, you can support huge XML files because the input format supports splitting XML.

Writing XML

Having successfully read XML, the next question is how to write XML. In your reducer, you have callbacks that occur before and after your main reduce method is called, which you can use to emit a start and end tag, as shown in the following example.[4]

4 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/xml/XmlMapReduceWriter.java.

Listing 3.2. A reducer to emit start and end tags

This could also be embedded in an OutputFormat, but I’ll leave that as a project for you to experiment with. Writing an OutputFormat class is covered in section 3.5.1.

Pig

If you want to work with XML in Pig, the Piggy Bank library[5] (a user-contributed library of useful Pig code) contains an XMLLoader. It works much like this technique and captures all of the content between a start and end tag, supplying it as a single byte array field in a Pig tuple.

5 Piggy Bank—user-defined pig functions: https://cwiki.apache.org/confluence/display/PIG/PiggyBank.

Hive

Currently, no means exist for working with XML in Hive. You’d have to write a custom SerDe, which we’ll cover in chapter 9. [6]

6 SerDe is a shortened form of Serializer/Deserializer; it’s the mechanism that allows Hive to read and write data in HDFS.

Summary

Mahout’s XmlInputFormat certainly helps you work with XML. But it’s sensitive to an exact string match of both the start and end element names. If the element tag can contain attributes with variable values, or if the generation of the element can’t be controlled and could result in XML namespace qualifiers being used, then this approach may not work for you. Also problematic will be situations where the element name you specify is used as a descendant child element.

If you have control over the XML laid out in the input, this exercise can be simplified by having a single XML element per line. This will let you use the built-in Map-Reduce text-based input formats (such as TextInputFormat), which treat each line as a record and split to preserve that demarcation.

Another option worth considering is that of a preprocessing step, where you could convert the original XML into a separate line per XML element, or convert it into an altogether different data format, such as a SequenceFile or Avro, both of which solve the splitting problem for you.

Now that you have a handle on how to work with XML, let’s tackle another popular serialization format, JSON.

3.2.2. JSON

JSON shares the machine- and human-readable traits of XML and has existed since the early 2000s. It’s less verbose than XML, and it doesn’t have the rich typing and validation features available in XML.

Technique 9 MapReduce and JSON

Imagine you have some code that’s downloading JSON data from a streaming REST service, and every hour it writes a file into HDFS. The amount of data being downloaded is large, so each file produced is multiple gigabytes in size.

You’ve been asked to write a MapReduce job that can take as input these large JSON files. What you have here is a problem in two parts: first, MapReduce doesn’t come with an InputFormat that works with JSON; second, how does one even go about splitting JSON?

Figure 3.7 shows the problem with splitting JSON. Imagine that MapReduce created a split as shown in the figure. The map task that operates on this input split will perform a seek to the start of the input split, and then needs to determine the start of the next record. With file formats such as JSON and XML, it’s challenging to know when the next record starts due to the lack of a synchronization marker, or any other indicator that identifies the start of a record.

Figure 3.7. Example of issue with JSON and multiple input splits

JSON is harder to partition into distinct segments than a format such as XML because JSON doesn’t have a token (like an end tag in XML) to denote the start or end of a record.

Problem

You want to work with JSON inputs in MapReduce, and also to ensure that input JSON files can be partitioned for concurrent reads.

Solution

The Elephant Bird LzoJsonInputFormat input format is used as a basis to create an input format class to work with JSON elements. This technique also discusses another approach using my open source project that can work with multiline JSON.

Discussion

Elephant Bird (https://github.com/kevinweil/elephant-bird), an open source project that contains useful utilities for working with LZOP compression, has an LzoJsonInputFormat that can read JSON, though it requires that the input file be LZOP-compressed. You can use the Elephant Bird code as a template for your own JSON InputFormat that doesn’t have the LZOP compression requirement.

This solution assumes that each JSON record is on a separate line. Your JsonRecordFormat is simple and does nothing other than construct and return a JsonRecordFormat, so we’ll skip over that code. The JsonRecordFormat emits LongWritable, MapWritable key/value pairs to the mapper, where the MapWritable is a map of JSON element names and their values.

Let’s take a look at how this RecordReader works. It uses the LineRecordReader, which is a built-in MapReduce reader that emits a record for each line. To convert the line to a MapWritable, the reader uses the following method:[7]

7 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/json/JsonInputFormat.java.

public static boolean decodeLineToJson(JSONParser parser, Text line,

MapWritable value) {

try {

JSONObject jsonObj = (JSONObject)parser.parse(line.toString());

for (Object key: jsonObj.keySet()) {

Text mapKey = new Text(key.toString());

Text mapValue = new Text();

if (jsonObj.get(key) != null) {

mapValue.set(jsonObj.get(key).toString());

}

value.put(mapKey, mapValue);

}

return true;

} catch (ParseException e) {

LOG.warn("Could not json-decode string: " + line, e);

return false;

} catch (NumberFormatException e) {

LOG.warn("Could not parse field into number: " + line, e);

return false;

}

}

The reader uses the json-simple parser (http://code.google.com/p/json-simple/) to parse the line into a JSON object, and then iterates over the keys in the JSON object and puts them, along with their associated values, into a MapWritable. The mapper is given the JSON data inLongWritable, MapWritable pairs and can process the data accordingly.

The following shows an example JSON object:

{

"results" :

[

{

"created_at" : "Thu, 29 Dec 2011 21:46:01 +0000",

"from_user" : "grep_alex",

"text" : "RT @kevinweil: After a lot of hard work by ..."

},

{

"created_at" : "Mon, 26 Dec 2011 21:18:37 +0000",

"from_user" : "grep_alex",

"text" : "@miguno pull request has been merged, thanks again!"

}

]

}

This technique assumes one JSON object per line. The following code shows the JSON file you’ll work with in this example:

{"created_at" : "Thu, 29 Dec 2011 21:46:01 +0000","from_user" : ...

{"created_at" : "Mon, 26 Dec 2011 21:18:37 +0000","from_user" : ...

Now copy the JSON file into HDFS and run your MapReduce code. The MapReduce code writes each JSON key/value pair to the output:

$ hadoop fs -put test-data/json/tweets.json tweets.json

$ hip hip.ch3.json.JsonMapReduce \

--input tweets.json \

--output output

$ hadoop fs -cat output/part*

text RT @kevinweil: After a lot of hard work by ...

from_user grep_alex

created_at Thu, 29 Dec 2011 21:46:01 +0000

text @miguno pull request has been merged, thanks again!

from_user grep_alex

created_at Mon, 26 Dec 2011 21:18:37 +0000

Writing JSON

An approach similar to what we looked at in section 3.2.1 for writing XML could also be used to write JSON.

Pig

Elephant Bird contains a JsonLoader and an LzoJsonLoader, which you can use to work with JSON in Pig. These loaders work with line-based JSON. Each Pig tuple contains a chararray field for each JSON element in the line.

Hive

Hive contains a DelimitedJSONSerDe class which can serialize JSON, but unfortunately can’t deserialize it, so you can’t load data into Hive using this SerDe.

Summary

This solution assumes that the JSON input is structured with one line per JSON object. How would you work with JSON objects that were across multiple lines? An experimental project on GitHub[8] works with multiple input splits over a single JSON file. This approach searches for a specific JSON member and retrieves the containing object.

8 A multiline JSON InputFormat: https://github.com/alexholmes/json-mapreduce.

You can also review a Google Code project called hive-json-serde (http://code.google.com/p/hive-json-serde/), which can support both serialization and deserialization.

As you can see, using XML and JSON in MapReduce is kludgy and has rigid requirements about how to lay out your data. Support for these two formats in MapReduce is also complex and error-prone, because neither lends itself naturally to splitting. Clearly, you need to look at alternative file formats that have built-in support for splittability.

The next step is to look at more sophisticated file formats that are better suited to working with MapReduce, such as Avro and SequenceFile.

3.3. Big data serialization formats

Unstructured text works well when you’re working with scalar or tabular data. Semistructured text formats such as XML and JSON can model more sophisticated data structures that include composite fields or hierarchical data. But when you’re working with big data volumes, you’ll need serialization formats with compact serialized forms that natively support partitioning and have schema evolution features.

In this section we’ll compare the serialization formats that work best with big data in MapReduce and follow up with how you can use them with MapReduce.

3.3.1. Comparing SequenceFile, Protocol Buffers, Thrift, and Avro

In my experience, the following characteristics are important when selecting a data serialization format:

· Code generation —Some serialization formats are accompanied by libraries with code-generation abilities that allow you to generate rich objects, making it easier for you to interact with your data. The generated code also provides the added benefit of type-safety to make sure that your consumers and producers are working with the right data types.

· Schema evolution —Data models evolve over time, and it’s important that your data formats support your need to modify your data models. Schema evolution allows you to add, modify, and in some cases delete attributes, while at the same time providing backward and forward compatibility for readers and writers.

· Language support —It’s likely that you’ll need to access your data in more than one programming language, and it’s important that the mainstream languages have support for a data format.

· Transparent compression —Data compression is important given the volumes of data you’ll work with, and a desirable data format has the ability to internally compress and decompress data on writes and reads. It’s a much bigger headache for you as a programmer if the data format doesn’t support compression, because it means that you’ll have to manage compression and decompression as part of your data pipeline (as is the case when you’re working with text-based file formats).

· Splittability —Newer data formats understand the importance of supporting multiple parallel readers that are reading and processing different chunks of a large file. It’s crucial that file formats contain synchronization markers (and thereby support the ability for a reader to perform a random seek and scan to the start of the next record).

· Support in MapReduce and the Hadoop ecosystem —A data format that you select must have support in MapReduce and other critical Hadoop ecosystem projects, such as Hive. Without this support, you’ll be responsible for writing the code to make a file format work with these systems.

Table 3.1 compares the more popular data serialization frameworks to see how they stack up against each other. Additional background on these technologies is provided in the following discussion.

Table 3.1. Feature comparison of data serialization frameworks

Library

Code generation

Schema evolution

Language support

Transparent compression

Splittable

Native support in MapReduce

Pig and Hive support

SequenceFile

No

No

Java, Python

Yes

Yes

Yes

Yes

Protocol Buffers

Yes (optional)

Yes

C++, Java, Python, Perl, Ruby

No

No

No

No

Thrift

Yes (mandatory)

Yes

C, C++, Java, Python, Ruby, Perl

No[a]

No

No

No

Avro

Yes (optional)

Yes

C, C++, Java, Python, Ruby, C#

Yes

Yes

Yes

Yes

Parquet

No

Yes

Java, Python (C++ planned in 2.0)

Yes

Yes

Yes

Yes

a Thrift does support compression, but not in the Java library.

Let’s look at each of these formats in more detail.

SequenceFile

The SequenceFile format was created to work with MapReduce, Pig, and Hive, and therefore integrates well with all of those tools. Its shortcomings are mainly its lack of code generation and versioning support, as well as limited language support.

Protocol Buffers

The Protocol Buffers format has been used heavily by Google for interoperability. Its strengths are its versioning support and compact binary format. Downsides include its lack of support in MapReduce (or in any third-party software) for reading files generated by Protocol Buffers serialization. Not all is lost, however; we’ll look at how Elephant Bird uses Protocol Buffers serialization within a higher-level container file in section 3.3.3.

Thrift

Thrift was developed at Facebook as a data-serialization and RPC framework. It doesn’t have support in MapReduce for its native data-serialization format, but it can support different wire-level data representations, including JSON and various binary encodings. Thrift also includes an RPC layer with various types of servers, including a nonblocking implementation. We’ll ignore the RPC capabilities for this chapter and focus on the data serialization.

Avro

The Avro format is Doug Cutting’s creation to help address the shortcomings of SequenceFile.

Parquet

Parquet is a columnar file format with rich Hadoop system support, and it works well with data models such as Avro, Protocol Buffers, and Thrift. Parquet is covered in depth in section 3.4.

Based on certain evaluation criteria, Avro seems to be the best fit as a data serialization framework in Hadoop. SequenceFile is a close second due to its inherent compatibility with Hadoop (it was designed for use with Hadoop).

You can review a useful jvm-serializers project at https://github.com/eishay/jvm-serializers/wiki/, which runs various benchmarks to compare file formats based on items such as serialization and deserialization times. It contains benchmarks for Avro, Protocol Buffers, and Thrift, along with a number of other frameworks.

After looking at how the various data-serialization frameworks compare, we’ll dedicate the next few sections to working with them. We’ll start off with a look at SequenceFile.

3.3.2. SequenceFile

Because SequenceFile was created for use with MapReduce, this format arguably offers the highest level of integration support in conjunction with MapReduce, Pig, and Hive. SequenceFile is a splittable binary file format that stores data in the form of key/value pairs. All SequenceFiles share the same header format, as shown in figure 3.8.

Figure 3.8. SequenceFile header format

SequenceFiles come in three types, which vary based on how you apply compression. In addition, each type has its own corresponding Writer classes.

Uncompressed

Uncompressed SequenceFiles are written using the SequenceFile.Writer class. No advantage exists for this over the compressed formats, because compression generally reduces your storage footprint and is more efficient for reads and writes. The file format is shown in figure 3.9.

Figure 3.9. File format for record-compressed and uncompressed SequenceFiles

Record-compressed

Record-compressed SequenceFiles are written using the SequenceFile.RecordCompressWriter class. When a record is added to the SequenceFile, it’s immediately compressed and written to the file. The disadvantage of this approach is that your compression ratio will suffer compared to block compression. This file format, which is essentially the same as that of uncompressed SequenceFiles, is shown in figure 3.9.

Block-compressed

Block-compressed SequenceFiles are written using the SequenceFile.BlockCompressWriter class. By default, the block size is the same as the HDFS block size, although this can be overridden. The advantage to this compression is that it’s more aggressive; the whole block is compressed, rather than compressing at the record level. Data isn’t written until it reaches the block size, at which point the whole block is compressed, resulting in good overall compression. The file format is shown in figure 3.10.

Figure 3.10. Block-compressed SequenceFile format

You only need one Reader class (SequenceFile.Reader) to read all three types of SequenceFiles. Even the Writer is abstracted, because you can call SequenceFile.createWriter to choose the preferred format, and it returns a base class that can be used for writing regardless of compression.

SequenceFiles have a pluggable serialization framework. Written keys and values must have a related org.apache.hadoop.io.serializer.Serializer and Deserializer for marshaling and unmarshaling. Hadoop comes with four serializers: Avro, Java, Tether (for binary data contained within a TetherData class), and Writable (the default serializer).[9]

9 Writable is an interface in Hadoop used to support general-purpose data serialization, and it’s used for sending data across the wire between Hadoop components. Yahoo has a good introduction to Writables at http://developer.yahoo.com/hadoop/tutorial/module5.html#writable.

Custom SequenceFile serialization

If you want your SequenceFile to contain objects that aren’t Writable or Serializable, you’ll need to implement your own Serializer and register it. You register it by updating core-site.xml and appending the class name of the custom serialization implementation to theio.serializations property.

SequenceFiles are splittable because a synchronization marker is written approximately every 6 KiB (1 kibibyte = 1024 bytes) in the file for record-based files, and before every block for block-based files.

Now let’s look at how to use SequenceFiles in MapReduce.

Technique 10 Working with SequenceFiles

Working with text in MapReduce can start to get tricky when you have to support complex types of data, which may include nonscalar data types such as lists or dictionaries. In addition, large compressed text files require some additional wrangling if Map-Reduce’s data locality properties are important to you. These challenges can be overcome by using a file format such as SequenceFile.

Problem

You want to work with a structured file format in MapReduce that you can use to model complex data structures and that also supports compression and splittable inputs.

Solution

This technique looks at how the SequenceFile file format can be used from both standalone applications and MapReduce.

Discussion

The SequenceFile format offers a high level of integration with computational tools such as MapReduce and can also model complex data structures. We’ll examine how to read and write SequenceFiles, and also how to use them with MapReduce, Pig, and Hive.

We’ll work with the stock data for this technique. The most common serialization method used with SequenceFiles is Writable, so you’ll need to create a Writable to represent the stock data. The key elements of writing a complex Writable are extending the Writable class and defining serialization and deserialization methods, as shown in the following listing.[10]

10 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/StockPriceWritable.java.

Listing 3.3. A Writable implementation to represent a stock price

Now that you have your Writable, you’ll need to write some code that will create a SequenceFile. You’ll read the stocks file from the local disk, create the StockWritable, and write it to your SequenceFile, using the stock symbol as your key:[11]

11 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/seqfile/writable/SequenceFileStockWriter.java.

Great! Now how do you go about reading the files created with your writer?[12]

12 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/seqfile/writable/SequenceFileStockReader.java.

Now you need to prove that it works by writing and reading a file:

$ cat test-data/stocks.txt

AAPL,2009-01-02,85.88,91.04,85.16,90.75,26643400,90.75

AAPL,2008-01-02,199.27,200.26,192.55,194.84,38542100,194.84

AAPL,2007-01-03,86.29,86.58,81.90,83.80,44225700,83.80

...

$ hip hip.ch3.seqfile.writable.SequenceFileStockWriter \

--input test-data/stocks.txt \

--output stocks.seqfile

$ hip hip.ch3.seqfile.writable.SequenceFileStockReader \

--input stocks.seqfile

AAPL,StockPriceWritable[symbol=AAPL,date=2009-01-02,open=85.88,...]

AAPL,StockPriceWritable[symbol=AAPL,date=2008-01-02,open=199.27,...]

AAPL,StockPriceWritable[symbol=AAPL,date=2007-01-03,open=86.29,...]

...

How would you process this SequenceFile in MapReduce? Luckily, both SequenceFileInputFormat and SequenceFileOutputFormat integrate nicely with MapReduce. Remember earlier in this chapter when we talked about how the default SequenceFile serialization supportsWritable classes for serialization? Because Writable is the native data format in MapReduce, using SequenceFiles with MapReduce is totally transparent. See if you agree. The following code shows a MapReduce job with an identity mapper and reducer:[13], [14]

13 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/seqfile/writable/SequenceFileStockMapReduce.java.

14 An identity function is a mathematical term to denote a function that returns the same value that was used as its argument. In MapReduce this means the same thing—the map identity function emits all the key/value pairs that it is supplied, as does the reducer, without any transformation or filtering. A job that doesn’t explicitly set a map or reduce class results in Hadoop using a built-in identity function.

Now you can run the identity MapReduce job against the stocks SequenceFile that you created earlier in this technique:

$ hip hip.ch3.seqfile.writable.SequenceFileStockMapReduce \

--input stocks.seqfile \

--output output

Because all it’s doing is echoing the input to the output, you should see identical content in both files. You can make sure that’s the case by reading in the job output file.

First of all, how do you verify that the output is a SequenceFile? Easy, just cat it—the first three bytes of a SequenceFile are SEQ, followed by a fourth byte containing the SequenceFile version, which is then followed by the key and value classes:

Looks good. Now try using the SequenceFile reader code you wrote earlier to dump it to standard output:

$ hip hip.ch3.seqfile.writable.SequenceFileStockReader \

--input output/part-r-00000

AAPL,StockPriceWritable[symbol=AAPL,date=2008-01-02,open=199.27,...]

AAPL,StockPriceWritable[symbol=AAPL,date=2007-01-03,open=86.29,...]

AAPL,StockPriceWritable[symbol=AAPL,date=2009-01-02,open=85.88,...]

...

That was easy. Because SequenceFiles are key/value-based, and the default serialization data format for SequenceFiles is Writable, the use of SequenceFiles is completely transparent to your map and reduce classes. We demonstrated this by using Map-Reduce’s built-in identity map and reduce classes with the SequenceFile as input. The only work you had to do was to tell MapReduce to use the SequenceFile-specific input and output format classes, which are built into MapReduce.

Reading SequenceFiles in Pig

By writing your own Writable you created more work for yourself with non-MapReduce tools such as Pig. Pig works well with Hadoop’s built-in scalar Writables such as Text and IntWritable, but it doesn’t have support for custom Writables. You’ll need to write your ownLoadFunc to support the StockPriceWritable. This will work well with MapReduce, but Pig’s SequenceFileLoader won’t work with your custom Writable, which means that you’ll need to write your own Pig loader to process your files. The appendix contains details on installing Pig.

The LoadFunc for Pig is straightforward, as can be seen in the following listing.[15]

15 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/seqfile/writable/SequenceFileStockLoader.java.

Listing 3.4. A Pig loader function that converts a StockPriceWritable into a Pig tuple

public class SequenceFileStockLoader extends FileInputLoadFunc {

private SequenceFileRecordReader<Text, StockPriceWritable> reader;

@Override

public Tuple getNext() throws IOException {

boolean next;

try {

next = reader.nextKeyValue();

} catch (InterruptedException e) {

throw new IOException(e);

}

if (!next) return null;

Object value = reader.getCurrentValue();

if (value == null) {

return null;

}

if (!(value instanceof StockPriceWritable)) {

return null;

}

StockPriceWritable w = (StockPriceWritable) value;

return TupleFactory.getInstance().newTuple(Arrays.asList(

w.getSymbol(), w.getDate(), w.getOpen(),

w.getHigh(), w.getLow(), w.getClose(),

w.getVolume(), w.getAdjClose()

));

}

@SuppressWarnings("unchecked")

@Override

public InputFormat getInputFormat() throws IOException {

return new SequenceFileInputFormat<Text, StockPriceWritable>();

}

@SuppressWarnings("unchecked")

@Override

public void prepareToRead(RecordReader reader, PigSplit split)

throws IOException {

this.reader = (SequenceFileRecordReader) reader;

}

@Override

public void setLocation(String location, Job job)

throws IOException {

FileInputFormat.setInputPaths(job, location);

}

}

Now you can try to load and dump the stock SequenceFile in Pig:

$ pig

grunt> REGISTER $HIP_HOME/*.jar;

grunt> REGISTER $HIP_HOME/lib/*.jar;

grunt> DEFINE SequenceFileStockLoader

hip.ch3.seqfile.writable.SequenceFileStockLoader();

grunt> stocks = LOAD 'stocks.seqfile' USING SequenceFileStockLoader;

grunt> dump stocks;

(AAPL,2009-01-02,85.88,91.04,85.16,90.75,26643400,90.75)

(AAPL,2008-01-02,199.27,200.26,192.55,194.84,38542100,194.84)

(AAPL,2007-01-03,86.29,86.58,81.9,83.8,44225700,83.8)

(AAPL,2006-01-03,72.38,74.75,72.25,74.75,28829800,74.75)

(AAPL,2005-01-03,64.78,65.11,62.6,63.29,24714000,31.65)

...

Hive

Hive contains built-in support for SequenceFiles, but it has two restrictions. First, it ignores the key portion of each record. Second, out of the box it only works with SequenceFile values that are Writable, and it supports them by performing a toString() to convert the value into a Textform.

In our example, you have a custom Writable, so you had to write a Hive SerDe, which deserialized your Writable into a form Hive could understand. The resulting data definition language (DDL) statement is as follows:[16]

16 The code for StockWritableSerDe is on GitHub at https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/StockWritableSerDe.java.

$ export HADOOP_CLASSPATH=$HIP_HOME/hip-<version>.jar

$ hive

hive> CREATE TABLE stocks (

symbol string,

dates string,

open double,

high double,

low double,

close double,

volume int,

adjClose double

)

ROW FORMAT SERDE 'hip.ch3.StockWritableSerDe'

STORED AS SEQUENCEFILE;

hive> LOAD DATA INPATH 'stocks.seqfile' INTO TABLE stocks;

hive> select * from stocks;

AAPL 2009-01-02 85.88 91.04 85.16 90.75 26643400 90.75

AAPL 2008-01-02 199.27 200.26 192.55 194.84 38542100 194.84

AAPL 2007-01-03 86.29 86.58 81.9 83.8 44225700 83.8

AAPL 2006-01-03 72.38 74.75 72.25 74.75 28829800 74.75

AAPL 2005-01-03 64.78 65.11 62.6 63.29 24714000 31.65

...

We’ll cover custom Hive SerDe examples in more detail in chapter 9.

Summary

SequenceFiles are useful in that they solve two problems that make using MapReduce challenging: they’re natively splittable, and they also have built-in support for compression, which makes it transparent to the user. They’re also useful as containers for other file formats that don’t integrate as well into MapReduce. The thorn in the side of SequenceFiles is that they lack multilanguage support, which limits the range of tools that can interoperate with your data. But if your data mostly stays in HDFS and is processed with MapReduce (or Hive/Pig), SequenceFiles may be just what you’re looking for.

Another challenge for SequenceFiles is their lack of schema evolution when working with Writables—making a change to your Writable won’t be backward or forward compatible unless you build that into your implementation. This can be solved by using Protocol Buffers as your key/value type.

This technique looked at how to use SequenceFiles with Writables, which SequenceFile knows how to encode and decode within its file format. How about making SequenceFiles work with data other than Writables?

Technique 11 Using SequenceFiles to encode Protocol Buffers

Writables are first-class citizens in SequenceFiles, and the APIs have specific methods to read and write Writable instances, which you saw in the previous technique. This doesn’t mean that SequenceFiles are limited to working with Writables—in fact, they can work with any data type as long as there’s a serialization implementation for your data type that plugs into Hadoop’s serialization framework.

Protocol Buffers is a sophisticated data format that Google open-sourced; it provides schema evolution and efficient data-encoding capabilities. (More details on Protocol Buffers are presented in section 3.3.3). In this technique, you’ll implement a Protocol Buffers serialization and see how it allows you to work with native Protocol Buffers objects in MapReduce.

Problem

You want to work with Protocol Buffers data in MapReduce.

Solution

Write a Protocol Buffers serializer, which enables you to encode Protocol Buffers serialized data within SequenceFiles.

Discussion

Hadoop uses its own serialization framework to serialize and deserialize data for performance reasons. An example use of this framework is when map outputs are written to disk as part of the shuffle phase. All map outputs must have a corresponding Hadoop serialization class that knows how to read and write data to a stream. Writables, which are the most commonly used data types in MapReduce, have a WritableSerialization class that uses the readFields and writeFields methods on the Writable interface to perform the serialization.

SequenceFiles use the same serialization framework to serialize and deserialize data within their key/value records, which is why SequenceFiles support Writables out of the box. Therefore, encoding a data type into a SequenceFile is just a matter of writing your own Hadoop serialization instance.

Your first step in getting Protocol Buffers to work with SequenceFiles is to write your own serialization class. Each serialization class must support serialization and deserialization, so let’s start with the serializer, whose job is to write records to an output stream.

The following code uses the MessageLite class as the type; it’s a superclass of all generated Protocol Buffers classes. The MessageLite interface provides methods to write Protocol Buffers to an output stream and read them from an input stream, as you’ll see in the following code:[17]

17 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/seqfile/protobuf/ProtobufSerialization.java.

Next up is the deserializer, whose job is to populate a Protocol Buffers object from an input stream. Things are a little trickier here compared to the serializer, as Protocol Buffers objects can only be engineered via their builder classes: [18]

18 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/seqfile/protobuf/ProtobufSerialization.java.

Now you need to configure Hadoop’s serialization framework to use your new serializer. This is accomplished by appending your new serializer to the io.serializations property. It’s usually good to write a helper method to make this easy for clients. The following example shows the standard serializers bundled with Hadoop 2 being appended with the serialization class you just created. The source for ProtobufSerialization isn’t shown here, but all it does is return instances of ProtobufSerializer and ProtobufDeserializer:[19]

19 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/seqfile/protobuf/ProtobufSerialization.java.

public static void register(Configuration conf) {

String[] serializations = conf.getStrings("io.serializations");

if (ArrayUtils.isEmpty(serializations)) {

serializations = new String[] {

WritableSerialization.class.getName(),

AvroSpecificSerialization.class.getName(),

AvroReflectSerialization.class.getName()

};

}

serializations = (String[]) ArrayUtils.add(

serializations,

ProtobufSerialization.class.getName()

);

conf.setStrings("io.serializations", serializations);

}

Next you need to generate a new Protocol Buffers–encoded SequenceFile. The key item here is that you’re calling the register method (shown in the preceding code) prior to using the SequenceFile writer:[20]

20 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/seqfile/protobuf/SequenceFileProtobufWriter.java.

Configuration conf = super.getConf();

ProtobufSerialization.register(conf);

SequenceFile.Writer writer =

SequenceFile.createWriter(conf,

SequenceFile.Writer.file(outputPath),

SequenceFile.Writer.keyClass(Text.class),

SequenceFile.Writer.valueClass(Stock.class),

SequenceFile.Writer.compression(

SequenceFile.CompressionType.BLOCK,

new DefaultCodec())

);

Text key = new Text();

for (Stock stock : StockUtils.fromCsvFile(inputFile)) {

key.set(stock.getSymbol());

writer.append(key, stock);

}

On to the MapReduce code. What’s great about your new serializer is that the map and reduce classes can work with the Protocol Buffers objects directly. Again, the key thing here is that you’re configuring the job to make available the Protocol Buffers serializer. In the following example you use an identity function to demonstrate how Protocol Buffers objects can be used as first-class citizens in MapReduce when encoded in SequenceFiles:[21]

21 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/seqfile/protobuf/SequenceFileProtobufMapReduce.java.

Now you can write a SequenceFile with Protocol Buffers values, run the identity MapReduce job over that data, and then dump the contents of the job output:

$ hip hip.ch3.seqfile.protobuf.SequenceFileProtobufWriter \

--input test-data/stocks.txt \

--output stocks.pb

$ hip hip.ch3.seqfile.protobuf.SequenceFileProtobufMapReduce \

--input stocks.pb \

--output output

$ hip hip.ch3.seqfile.protobuf.SequenceFileProtobufReader \

--input output/part-r-00000

AAPL,symbol: "AAPL"

date: "2008-01-02"

open: 199.27

...

Next up, we’ll examine additional ways that you can integrate Protocol Buffers into MapReduce.

3.3.3. Protocol Buffers

Google developers invented Protocol Buffers to help them exchange data between services written in multiple languages in a compact and efficient manner. Protocol Buffers is now Google’s de facto format for data—there are over 48,000 different message types defined in Google across more than 12,000.proto files.[22]

22 Protocol Buffers usage statistics taken from Google’s Protocol Buffers Developer Guide: http://code.google.com/apis/protocolbuffers/docs/overview.html.

There’s been a ticket open since 2008 with the goal of adding native support for Protocol Buffers in MapReduce.[23] As a result, you’ll need to turn to alternative methods of working with Protocol Buffers in Hadoop. The previous technique covered one approach that can be used, which is to encode Protocol Buffers within SequenceFiles. Other options exist, such as using either Elephant Bird[24] or Avro, which support Protocol Buffers by wrapping them within their own file formats. Ultimately, these are all stop-gap measures until we get full support for Protocol Buffers in Hadoop.

23 See https://issues.apache.org/jira/browse/MAPREDUCE-377.

24 Using Elephant Bird means you have to use LZOP; ostensibly, it would be possible to derive a version of their classes and remove the LZOP dependency, but it’s probably worth looking elsewhere if you’re not already using LZOP.

There are a number of ways that you can work with Protocol Buffers in Hadoop:

· You can serialize Protocol Buffers objects in binary form within SequenceFiles, as was shown in the previous technique.

· Elephant Bird (https://github.com/kevinweil/elephant-bird), an open source project out of Twitter, supports Protocol Buffers within their own binary file format.

· Parquet, a columnar file format that is covered in section 3.4, has support for the Protocol Buffers object model and allows you to effectively write and read Protocol Buffers into a columnar form.

Of these options, Parquet is the recommended way of working with Protocol Buffers—not only does it allow you to work natively with Protocol Buffers, but it also opens up the number of tools that can work with your data (due to Parquet’s extensive Hadoop tooling support). This chapter’s coverage of Parquet includes a look at how Avro can be used with Parquet, and Parquet can be used in a similar way to support Protocol Buffers.

Thrift is another data format, which, like Protocol Buffers, doesn’t have out-of-the-box support with MapReduce. Again, you must rely on other tools to work with Thrift data in Hadoop, as you’ll discover in the next section.

3.3.4. Thrift

Facebook created Thrift to help with efficient data representation and transport. Facebook uses Thrift for a number of applications, including search, logging, and its ads platform.

The same three options for working with Protocol Buffers also apply to Thrift, and once again, the recommendation is to use Parquet as the file format. Head on over to the section on Parquet (section 3.4) to learn more about how Parquet integrates with these different data models.

Let’s look at what’s likely the most capable data serialization format of all our options, Avro.

3.3.5. Avro

Doug Cutting created Avro, a data serialization and RPC library, to help improve data interchange, interoperability, and versioning in MapReduce. Avro utilizes a compact binary data format—which you have the option to compress—that results in fast serialization times. Although it has the concept of a schema, similar to Protocol Buffers, Avro improves on Protocol Buffers because its code generation is optional, and it embeds the schema in the container file format, allowing for dynamic discovery and data interactions. Avro has a mechanism to work with schema data that uses generic data types (an example of which can be seen in chapter 4).

The Avro file format is shown in figure 3.11. The schema is serialized as part of the header, which makes deserialization simple and loosens restrictions around users having to maintain and access the schema outside of the Avro data files being interacted with. Each data block contains a number of Avro records, and by default is 16 KB in size.

Figure 3.11. Avro container file format

The holy grail of data serialization supports code generation, versioning, and compression, and has a high level of integration with MapReduce. Equally important is schema evolution, and that’s the reason why Hadoop SequenceFiles aren’t appealing—they don’t support the notion of a schema or any form of data evolution.

In this section you’ll get an overview of Avro’s schema and code-generation capabilities, how to read and write Avro container files, and the various ways Avro can be integrated with MapReduce. At the end we’ll also look at Avro support in Hive and Pig.

Let’s get rolling with a look at Avro’s schema and code generation.

Technique 12 Avro’s schema and code generation

Avro has the notion of generic data and specific data:

· Generic data allows you to work with data at a low level without having to understand schema specifics.

· Specific data allows you to work with Avro using code-generated Avro primitives, which supports a simple and type-safe method of working with your Avro data.

This technique looks at how to work with specific data in Avro.

Problem

You want to define an Avro schema and generate code so you can work with your Avro records in Java.

Solution

Author your schema in JSON form, and then use Avro tools to generate rich APIs to interact with your data.

Discussion

You can use Avro in one of two ways: either with code-generated classes or with its generic classes. In this technique we’ll work with the code-generated classes, but you can see an example of how Avro’s generic records are used in technique 29 in chapter 4.

Getting Avro

The appendix contains instructions on how to get your hands on Avro.

In the code-generated approach, everything starts with a schema. The first step is to create an Avro schema to represent an entry in the stock data:[25]

25 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/avro/stock.avsc.

{

"name": "Stock",

"type": "record",

"namespace": "hip.ch3.avro.gen",

"fields": [

{"name": "symbol", "type": "string"},

{"name": "date", "type": "string"},

{"name": "open", "type": "double"},

{"name": "high", "type": "double"},

{"name": "low", "type": "double"},

{"name": "close", "type": "double"},

{"name": "volume", "type": "int"},

{"name": "adjClose", "type": "double"}

]

}

Avro supports code generation for schema data as well as RPC messages (which aren’t covered in this book). To generate Java code for a schema, use the Avro tools JAR as follows:

Generated code will be put into the hip.ch3.avro.gen package. Now that you have generated code, how do you use it to read and write Avro container files?[26]

26 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/avro/AvroStockFileWrite.java.

Listing 3.5. Writing Avro files from outside of MapReduce

As you see, you can specify the compression codec that should be used to compress the data. In this example you’re using Snappy, which, as shown in chapter 4, is the fastest codec for reads and writes.

The following code example shows how you can marshal a Stock object from a line in the input file. As you can see, the generated Stock class is a POJO with a bunch of setters (and matching getters):

public static Stock fromCsv(String line) throws IOException {

String parts[] = parser.parseLine(line);

Stock stock = new Stock();

stock.setSymbol(parts[0]);

stock.setDate(parts[1]);

stock.setOpen(Double.valueOf(parts[2]));

stock.setHigh(Double.valueOf(parts[3]));

stock.setLow(Double.valueOf(parts[4]));

stock.setClose(Double.valueOf(parts[5]));

stock.setVolume(Integer.valueOf(parts[6]));

stock.setAdjClose(Double.valueOf(parts[7]));

return stock;

}

Now, how about reading the file you just wrote?[27]

27 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/avro/AvroStockFileRead.java.

Go ahead and execute this writer and reader pair:

Avro comes bundled with some tools to make it easy to examine the contents of Avro files. To view the contents of an Avro file as JSON, simply run this command:

$ java -jar $HIP_HOME/lib/avro-tools-1.7.4.jar tojson stocks.avro

{"symbol":"AAPL","date":"2009-01-02","open":85.88,"high":91.04,...

{"symbol":"AAPL","date":"2008-01-02","open":199.27,"high":200.26,...

{"symbol":"AAPL","date":"2007-01-03","open":86.29,"high":86.58,...

...

This assumes that the file exists on the local filesystem. Similarly, you can get a JSON representation of your Avro file with the following command:

$ java -jar $HIP_HOME/lib/avro-tools-1.7.4.jar getschema stocks.avro

{

"type" : "record",

"name" : "Stock",

"namespace" : "hip.ch3.avro.gen",

"fields" : [ {

"name" : "symbol",

"type" : "string"

}, {

"name" : "date",

"type" : "string"

}, {

"name" : "open",

"type" : "double"

}, {

"name" : "high",

"type" : "double"

}, {

"name" : "low",

"type" : "double"

}, {

"name" : "close",

"type" : "double"

}, {

"name" : "volume",

"type" : "int"

}, {

"name" : "adjClose",

"type" : "double"

} ]

}

You can run the Avro tools without any options to view all the tools you can use:

$ java -jar $HIP_HOME/lib/avro-tools-1.7.4.jar

compile Generates Java code for the given schema.

concat Concatenates avro files without re-compressing.

fragtojson Renders a binary-encoded Avro datum as JSON.

fromjson Reads JSON records and writes an Avro data file.

fromtext Imports a text file into an avro data file.

getmeta Prints out the metadata of an Avro data file.

getschema Prints out schema of an Avro data file.

idl Generates a JSON schema from an Avro IDL file

induce Induce schema/protocol from Java class/interface

via reflection.

jsontofrag Renders a JSON-encoded Avro datum as binary.

recodec Alters the codec of a data file.

rpcprotocol Output the protocol of a RPC service

rpcreceive Opens an RPC Server and listens for one message.

rpcsend Sends a single RPC message.

tether Run a tethered mapreduce job.

tojson Dumps an Avro data file as JSON, one record per line.

totext Converts an Avro data file to a text file.

trevni_meta Dumps a Trevni file's metadata as JSON.

trevni_random Create a Trevni file filled with random instances

of a schema.

trevni_tojson Dumps a Trevni file as JSON.

One shortcoming of the tojson tool is that it doesn’t support reading data in HDFS. I’ve therefore bundled a utility with the book’s code called AvroDump that can dump a text representation of Avro data in HDFS, which we’ll use shortly to examine the output of Avro MapReduce jobs:

$ hip hip.util.AvroDump --file stocks.avro

This utility supports multiple files (they need to be CSV-delimited) and globbing, so you can use wildcards. The following example shows how you would dump out the contents of a MapReduce job that produced Avro output into a directory called mr-output-dir:

$ hip hip.util.AvroDump --file mr-output-dir/part*

Let’s see how Avro integrates with MapReduce.

Technique 13 Selecting the appropriate way to use Avro in MapReduce

Avro supports more than one way to work with your Avro data in MapReduce. This technique enumerates the different ways you can work with your data and provides guidance on which situations call for which approach.

Problem

You want to use Avro in your MapReduce job, but it’s unclear which of the available integration options you should choose.

Solution

Learn more about each integration option, and pick the one best suited for your use case.

Discussion

There are three ways that you can use Avro in MapReduce, and the specific details on how to use each are discussed in techniques that follow this one. These are the three approaches:

· Mixed-mode —Appropriate when you want to mix Avro data with non-Avro data in your job

· Record-based —Useful when data is supplied in a non-key/value way

· Key/value-based —For when your data must fit a specific model

Let’s cover each method in more detail.

Mixed-mode

This use case is for instances where any one of these conditions holds true:

· Your mapper input data isn’t in Avro form.

· You don’t want to emit intermediate data between your mappers and reducers using Avro.

· Your job output data isn’t in Avro form.

In any of these cases, the Avro mapper and reducer classes won’t help you, as they are designed with the assumption that Avro data is flowing end-to-end in your MapReduce job. In this case, you’ll want to use the regular MapReduce mapper and reducer classes and construct your job in a way that allows you to still work with Avro data.

Record-based

Avro data is record-based, which results in a impedance mismatch when compared with MapReduce, which is key/value-based. To support Avro’s record-based roots, Avro comes bundled with a mapper class that isn’t key/value-based, and instead only supplies derived classes with a single record.

Key/value-based

If your Avro data internally follows a key/value structure, you can use some Avro-supplied mapper classes that will transform your Avro records and supply them in a key/value form to your mapper. With this method, you’re restricted to schemas that literally have “key” and “value” elements.

Summary

Selecting the right level of integration with Avro is a function of your inputs and outputs, and how you want to work with data inside of Avro. This technique examined three ways of integrating with Avro so that you can pick the right method for your use case. In the following techniques, we’ll look at how to use each of these integration methods in your MapReduce jobs.

Technique 14 Mixing Avro and non-Avro data in MapReduce

This level of Avro integration in MapReduce is suitable in cases where you have non-Avro input and generate Avro outputs, or vice versa, in which case the Avro mapper and reducer classes aren’t suitable. In this technique, we’ll look at how to work in a mixed-mode fashion with Avro.

Problem

You want to use Avro in a mixed mode in your MapReduce job, which isn’t supported by the Avro-bundled mapper and reducer classes.

Solution

Use low-level methods to set up your job and drive Avro data through your Map-Reduce job using the regular Hadoop mapper and reducer classes.

Discussion

Avro comes with some mapper and reducer classes that you can subclass to work with Avro. They’re useful in situations where you want your mappers and reducers to exchange Avro objects. But if you don’t have a requirement to pass Avro objects between your map and reduce tasks, you’re better off using the Avro input and output format classes directly, as you’ll see in the following code, which produces an average of all of the opening stock values.

We’ll start with a look at the job configuration. Your job is to consume stock data and produce stock averages, both in Avro formats.[28] To do this, you need to set the job configuration with the schema information for both schemas. You also need to specify Avro’s input and output format classes:[29]

28 Even though this technique is about mixing Avro and non-Avro data together in your jobs, I show Avro being used throughout the job so that you can pick which aspect you wish to integrate into your job. For example, if you have text inputs and Avro outputs, you’d use a regular TextInputFormat, and set the Avro output format.

29 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/avro/AvroMixedMapReduce.java.

Next up is the Map class. The entire Avro record is supplied as the input key to your map function, because Avro supports records, not key/value pairs (although, as you’ll see later, Avro does have a way to provide data to your map function using key/value pairs if your Avro schema has fields called key and value). From an implementation perspective, your map function extracts the necessary fields from the stock record and emits them to the reducer, with the stock symbol and the opening stock price as the key/value pairs:[30]

30 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/avro/AvroMixedMapReduce.java.

Why is the “old” MapReduce API being used?

You may have noticed that the example in this technique uses the older org.apache.hadoop.mapred API. This is because the AvroInputFormat and AvroOutputFormat classes used in this technique only support the old API.

Finally, the reduce function sums together all of the stock prices for each stock and outputs an average price:[31]

31 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/avro/AvroMixedMapReduce.java.

You can run the MapReduce code as follows:

$ hip hip.ch3.avro.AvroMixedMapReduce \

--input stocks.avro \

--output output

Your MapReduce job is outputting a different Avro object (StockAvg) from the job input. You can verify that the job produced the output you expected by writing some code (not listed) to dump your Avro objects:

$ hip hip.util.AvroDump --file output/part*

{"symbol": "AAPL", "avg": 68.631}

{"symbol": "CSCO", "avg": 31.147999999999996}

{"symbol": "GOOG", "avg": 417.47799999999995}

{"symbol": "MSFT", "avg": 44.63100000000001}

{"symbol": "YHOO", "avg": 69.333}

Summary

This technique is useful in cases where you don’t want intermediary map outputs in Avro form, or if you have non-Avro inputs or outputs. Next we’ll look at the Avro-native way of working with data in MapReduce.

Technique 15 Using Avro records in MapReduce

Avro isn’t a native key/value serialization format, unlike SequenceFile, so it can require a little shoehorning to get it to work with MapReduce. In this technique you’ll examine the Avro-specific mapper and reducer classes that expose a record-based interface you can use to input and output data.

Problem

You want to use Avro end-to-end in your MapReduce job, and you also wish to interact with your input and output data in record-oriented form.

Solution

Extend the AvroMapper and AvroReducer classes to implement your MapReduce job.

Discussion

Avro comes with two classes that abstract away the key/value nature of MapReduce and instead expose a record-based API. In this technique you’ll implement the same MapReduce job as in the prior technique (calculating the average open prices for each stock symbol), and use Avro throughout the job.

First, let’s look at the Mapper class, which will extend AvroMapper: [32]

32 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/avro/AvroRecordMapReduce.java.

The first thing to notice is that there are two types defined in the class definition, not four as is the norm in MapReduce. The AvroMapper abstracts away the key/value traits of the mapper inputs and outputs, replacing each with a single type.

If you had a map-only job, the types that you’d define would be the input and output types. But if you’re running a full-blown MapReduce job, you’ll need to use the Pair class so that you can define the map output key/value pairs. The Pair class requires that an Avro schema exists for the key and value parts, which is why the Utf8 class is used instead of a straight Java string.

Let’s now take a peek at the AvroReducer implementation. This time there are three types you need to define—the map output key and value types, and the reducer output type:[33]

33 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/avro/AvroRecordMapReduce.java.

Now you can plumb it all together in the driver. Here you’ll define the input and output types and the desired output compression, if any:[34]

34 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/avro/AvroRecordMapReduce.java.

AvroJob.setInputSchema(job, Stock.SCHEMA$);

AvroJob.setMapOutputSchema(job,

Pair.getPairSchema(Schema.create(Schema.Type.STRING), Stock.SCHEMA$));

AvroJob.setOutputSchema(job, StockAvg.SCHEMA$);

AvroJob.setMapperClass(job, Mapper.class);

AvroJob.setReducerClass(job, Reducer.class);

FileOutputFormat.setCompressOutput(job, true);

AvroJob.setOutputCodec(job, SNAPPY_CODEC);

Done! Give it a whirl, and check the outputs after the job completes:

$ hip hip.ch3.avro.AvroRecordMapReduce \

--input stocks.avro \

--output output

...

$ hip hip.util.AvroDump --file output/part*

{"symbol": "AAPL", "avg": 68.631}

{"symbol": "CSCO", "avg": 31.147999999999996}

{"symbol": "GOOG", "avg": 417.47799999999995}

{"symbol": "MSFT", "avg": 44.63100000000001}

{"symbol": "YHOO", "avg": 69.333}

Summary

This technique is handy in situations where you want to keep your data in Avro form throughout the MapReduce job, and you don’t have a requirement that your input or output data be key/value-based.

But what if you do need your data be key/value-based, and you still want to use Avro goodies such as compact serialization size and built-in compression?

Technique 16 Using Avro key/value pairs in MapReduce

MapReduce’s native data model is key/value pairs, and as I’ve mentioned earlier, Avro’s is record-based. Avro doesn’t have native support for key/value data, but some helper classes exist in Avro to help model key/value data and to use this natively in MapReduce.

Problem

You want to use Avro as a data format and container, but you want to model your data using key/value pairs in Avro and use them as native key/value pairs in MapReduce.

Solution

Use the AvroKeyValue, AvroKey, and AvroValue classes to work with Avro key/value data.

Discussion

Avro has an AvroKeyValue class that encapsulates a generic Avro record containing two records named key and value. AvroKeyValue serves as a helper class so that you can easily read and write key/value data. The types of these records are defined by you.

In this technique you’ll repeat the average stock MapReduce job, but this time using Avro’s key/value framework. You’ll first need to generate the input data for your job. In this case, we’ll put the stock symbol in the key and the Stock object in the value:[35]

35 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/avro/AvroKeyValueFileWrite.java.

Go ahead and generate a file in HDFS containing the stock data in key/value format:

$ hip hip.ch3.avro.AvroKeyValueFileWrite \

--input test-data/stocks.txt \

--output stocks.kv.avro

If you’re curious to know the Avro schema definition of the file you just generated, use the tip highlighted in technique 12 to extract the schema from the file. In addition, you can use the AvroDump utility to show the contents of the file:

# the "getschema" tool only works with data in the local filesystem,

# so first copy the stocks file from HDFS to local disk

$ hadoop fs -get stocks.kv.avro .

$ java -jar $HIP_HOME/lib/avro-tools-1.7.4.jar getschema stocks.kv.avro

{

"type" : "record",

"name" : "KeyValuePair",

"namespace" : "org.apache.avro.mapreduce",

"doc" : "A key/value pair",

"fields" : [ {

"name" : "key",

"type" : "string",

"doc" : "The key"

}, {

"name" : "value",

"type" : {

"type" : "record",

"name" : "Stock",

"namespace" : "hip.ch3.avro.gen",

"fields" : [ {

"name" : "symbol",

"type" : "string"

}, {

"name" : "date",

"type" : "string"

}, {

"name" : "open",

"type" : "double"

}, {

"name" : "high",

"type" : "double"

}, {

"name" : "low",

"type" : "double"

}, {

"name" : "close",

"type" : "double"

}, {

"name" : "volume",

"type" : "int"

}, {

"name" : "adjClose",

"type" : "double"

} ]

},

"doc" : "The value"

} ]

}

$ hip hip.util.AvroDump --file stocks.kv.avro

{"key": "AAPL", "value": {"symbol": "AAPL", "date": "2009-01-02", ...

{"key": "AAPL", "value": {"symbol": "AAPL", "date": "2008-01-02", ...

{"key": "AAPL", "value": {"symbol": "AAPL", "date": "2007-01-03", ...

Now for some MapReduce code—you’ll define your mapper, reducer, and driver in one shot:[36]

36 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/avro/AvroKeyValueMapReduce.java.

public int run(final String[] args) throws Exception {

....

job.setInputFormatClass(AvroKeyValueInputFormat.class);

AvroJob.setInputKeySchema(job, Schema.create(Schema.Type.STRING));

AvroJob.setInputValueSchema(job, Stock.SCHEMA$);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(DoubleWritable.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(AvroValue.class);

job.setOutputFormatClass(AvroKeyValueOutputFormat.class);

AvroJob.setOutputValueSchema(job, StockAvg.SCHEMA$);

...

}

public static class Map extends

Mapper<AvroKey<CharSequence>, AvroValue<Stock>,

Text, DoubleWritable> {

@Override

public void map(AvroKey<CharSequence> key,

AvroValue<Stock> value,

Context context) {

context.write(new Text(key.toString()),

new DoubleWritable(value.datum().getOpen()));

}

}

public static class Reduce extends

Reducer<Text, DoubleWritable, Text, AvroValue<StockAvg>> {

@Override

protected void reduce(Text key,

Iterable<DoubleWritable> values,

Context context) {

double total = 0.0;

double count = 0;

for (DoubleWritable val: values) {

total += val.get();

count++;

}

StockAvg avg = new StockAvg();

avg.setSymbol(key.toString());

avg.setAvg(total / count);

context.write(key, new AvroValue<StockAvg>(avg));

}

}

As you can see, the AvroKey and AvroValue wrappers are used to supply input data in the mapper, as well as output data in the reducer. The neat thing here is that Avro is smart enough to support Hadoop Writable objects and automatically convert them into their Avro counterparts, which is why you don’t need to tell Avro the schema type of the output key.

You can run the MapReduce job with the following command:

$ hip hip.ch3.avro.AvroKeyValueMapReduce \

--input stocks.kv.avro \

--output output

And again, you can view the output with the AvroDump tool:

$ hip hip.util.AvroDump --file output/part*

{"key": "AAPL", "value": {"symbol": "AAPL", "avg": 68.631}}

{"key": "CSCO", "value": {"symbol": "CSCO", "avg": 31.148}}

{"key": "GOOG", "value": {"symbol": "GOOG", "avg": 417.478}}

{"key": "MSFT", "value": {"symbol": "MSFT", "avg": 44.631}}

{"key": "YHOO", "value": {"symbol": "YHOO", "avg": 69.333}}

Summary

This concludes our coverage of the three Avro approaches for working with your data in MapReduce. Each of the methods is suited to a particular task, and you can select whichever one works best for your needs.

Let’s wrap up our Avro and MapReduce coverage by looking at how you can customize ordering characteristics of Avro data in MapReduce.

Technique 17 Controlling how sorting worksin MapReduce

If you decide to use Avro data as intermediary map outputs, you may be wondering what control you have over how partitioning, sorting, and grouping work.

Problem

You want control over how MapReduce sorts your reducer inputs.

Solution

Modify the Avro schema to alter ordering behavior.

Discussion

If an Avro object is used as the key output in a mapper, the following happens by default:

· All the fields in the Avro object are used for partitioning, sorting, and grouping.

· The fields are ordered using their ordinal position in the schema. This means that if you have a schema with two elements, the first element in the schema is used for sorting first, followed by the second element.

· Within an element, sorting occurs using comparisons that are specific to the type. So if strings are being compared, the sorting will be lexicographical, and if numbers are being compared, numerical comparison is used.

Some of this behavior can be changed. The following is a modified version of the Stock schema:

{

"name": "Stock",

"type": "record",

"namespace": "hip.ch3.avro.gen",

"fields": [

{"name": "symbol", "type": "string"},

{"name": "date", "type": "string"},

{"name": "open", "type": "double", "order": "descending"},

{"name": "high", "type": "double", "order": "ignore"}

]

}

You can modify the sorting behavior for a field by decorating it with an order attribute and specifying that descending order should be used. Alternatively, you can exclude a field from partitioning, sorting, and grouping by setting the order to ignore.

Note that these are schema-wide settings, and there’s no easy way to specify custom partition/sort/group settings on a per-job basis. You can go ahead and write your own partition, sort, and group functions (just like you would for a Writable), but it would be useful if Avro had helper functions to simplify this process.

Technique 18 Avro and Hive

It wasn’t until recently that the Hive project had built-in support for Avro. This technique looks at how you can work with Avro data in Hive.

Problem

You want to work with Avro data in Hive.

Solution

Use Hive’s Avro Serializer/Deserializer.

Discussion

Hive version 0.9.1 and newer come bundled with an Avro SerDe, short for Serializer/Deserializer, which allows Hive to read data in from a table and write it back out to a table. The appendix has instructions on how to install Hive.

You need to copy the Avro schemas bundled with this book into HDFS, and also create a directory containing some example Avro stock records:

$ hadoop fs -put $HIP_HOME/schema schema

$ hadoop fs -mkdir stock_hive

$ hip hip.ch3.avro.AvroStockFileWrite \

--input test-data/stocks.txt \

--output stock_hive/stocks.avro

Next, fire up the Hive console and create an external Hive table for the directory you just created. You also need to specify the location of the Avro schema in HDFS. Replace YOUR-HDFS-USERNAME with your HDFS username:

hive> CREATE EXTERNAL TABLE stocks

COMMENT "An Avro stocks table"

ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'

STORED AS

INPUTFORMAT

'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'

OUTPUTFORMAT

'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'

LOCATION '/user/YOUR-HDFS-USERNAME/stock_hive/'

TBLPROPERTIES (

'avro.schema.url'='hdfs:///user/YOUR-HDFS-USERNAME/schema/stock.avsc'

);

AvroSerDe actually supports three ways to define a schema for an Avro table—for this technique, I picked the method that you’ll most likely want to use in production, but for more details on the other ways to specify a schema, refer to the AvroSerDe site:https://cwiki.apache.org/confluence/display/Hive/AvroSerDe.

Just like with any Hive table, you can query Hive to describe the schema for a table:

hive> describe stocks;

symbol string

date string

open double

high double

low double

close double

volume int

adjclose double

Run a query to verify that everything’s working. The following Hive Query Language (HiveQL) will count the number of stock records for each stock symbol:

hive> SELECT symbol, count(*) FROM stocks GROUP BY symbol;

AAPL 10

CSCO 10

GOOG 5

MSFT 10

YHOO 10

What if you wanted to write data to an Avro-backed Hive table? The following example shows how you would copy a subset of the records in the stocks table and insert them into a new table. This example also highlights how you’d use the Snappy compression codec for any writes into the new table:

hive> SET hive.exec.compress.output=true;

hive> SET avro.output.codec=snappy;

hive> CREATE TABLE google_stocks

COMMENT "An Avro stocks table containing just Google stocks"

ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'

STORED AS

INPUTFORMAT

'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'

OUTPUTFORMAT

'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'

TBLPROPERTIES (

'avro.schema.url'='hdfs:///user/YOUR-USERNAME/schema/stock.avsc'

);

OK

hive> INSERT OVERWRITE TABLE google_stocks

SELECT * FROM stocks WHERE symbol = 'GOOG';

OK

hive> select * from google_stocks limit 5;

OK

GOOG 2009-01-02 308.6 321.82 305.5 321.32 3610500 321.32

GOOG 2008-01-02 692.87 697.37 677.73 685.19 4306900 685.19

GOOG 2007-01-03 466.0 476.66 461.11 467.59 7706500 467.59

GOOG 2006-01-03 422.52 435.67 418.22 435.23 13121200 435.23

GOOG 2005-01-03 197.4 203.64 195.46 202.71 15844200 202.71

For more details on Hive, please refer to chapter 9. Next we’ll look at how you’d perform the same sequence of actions in Pig.

Technique 19 Avro and Pig

Much like Hive, Pig also has built-in support for Avro, which is covered in this technique.

Problem

You want to read and write Avro data using Pig.

Solution

Use the AvroStorage class in Pig’s Piggy Bank library.

Discussion

Piggy Bank is a library that contains a useful collection of Pig utilities, one of which is the AvroStorage class you can use to read and write Avro data in HDFS. In this technique you’ll mirror the steps you took in the previous Hive technique—you’ll read in some stock data, perform some simple aggregations, and store some filtered data back into HDFS.

Before you get started, load some Avro stock data into a directory in HDFS:

$ hadoop fs -put $HIP_HOME/schema schema

$ hadoop fs -mkdir stock_pig

$ hip hip.ch3.avro.AvroStockFileWrite \

--input test-data/stocks.txt \

--output stock_pig/stocks.avro

In Pig-land, your first step is to register the JARs required for AvroStorage to work. You may have to hunt down the specific location of the JARs bundled with the Hadoop distribution that you’re using. The locations in the following code assume that Apache Hadoop and Pig were installed under /usr/local:

$ pig

REGISTER /usr/local/pig/contrib/piggybank/java/piggybank.jar;

REGISTER /usr/local/hadoop/share/hadoop/common/lib/avro-*.jar;

REGISTER /usr/local/hadoop/share/hadoop/common/lib/jackson-*.jar;

REGISTER /usr/local/hadoop/share/hadoop/common/lib/snappy-*.jar;

REGISTER /usr/local/hadoop/share/hadoop/httpfs/tomcat/webapps/

webhdfs/WEB-INF/lib/json-*.jar;

Next, load the stocks into a Pig relation and then display the schema details using the LOAD and DESCRIBE operators:

grunt> stocks = LOAD 'stock_pig/' USING

org.apache.pig.piggybank.storage.avro.AvroStorage();

grunt> DESCRIBE stocks;

records: {symbol: chararray,date: chararray,open: double,

high: double,low: double,close: double,volume: int,

adjClose: double}

Notice that you didn’t have to supply details about the Avro schema. That’s because the Avro container format you used had the schema embedded in the header. If your files don’t have the schema embedded, AvroStorage can still support your data, but you’ll need to upload the Avro schema to HDFS (like you did in Hive) and use the “schema_file” option—check out the Pig documentation for more details.[37]

37 More Avro and Pig integration details are available on the AvroStorage page: https://cwiki.apache.org/confluence/display/PIG/AvroStorage.

To validate that Avro and Pig are working together, you can perform a simple aggregation and count the number of stock records for each stock symbol:

grunt> by_symbol = GROUP stocks BY symbol;

grunt> symbol_count = foreach by_symbol generate group, COUNT($1);

grunt> dump symbol_count;

(AAPL,10)

(CSCO,10)

(GOOG,5)

(MSFT,10)

(YHOO,10)

The following example shows how you can write out Avro data in Pig. The example filters the Google stocks from the input data and writes them into a new output directory in HDFS. This also shows how you can compress job outputs using Snappy:

grunt> SET mapred.compress.map.output true;

grunt> SET mapred.output.compress true;

grunt> SET mapred.output.compression.codec

org.apache.hadoop.io.compress.SnappyCodec

grunt> SET avro.output.codec snappy;

grunt> google_stocks = FILTER stocks BY symbol == 'GOOG';

grunt> STORE google_stocks INTO 'stock_pig_output/'

USING org.apache.pig.piggybank.storage.avro.AvroStorage(

'no_schema_check',

'data', 'stock_pig/');

When writing Avro data to HDFS, you’ll need to specify the Avro schema of the data you’re persisting. The preceding example uses the data option to tell AvroStorage to use the Avro schema embedded in files under your input directory.

As with loading files, there are various other methods for telling AvroStorage your schema details, and these are documented on Pig’s wiki.[38]

38 Additional resources on AvroStorage are at https://cwiki.apache.org/confluence/display/PIG/AvroStorage.

Summary

The last few techniques have demonstrated how easy and straightforward it is to use Avro with MapReduce, Hive, and Pig. Using Avro to store your data gives you a number of useful free features, such as versioning support, compression, splittability, and code generation. Avro’s strong integration with MapReduce, Hive, Pig, and numerous other tools, such as Impala and Flume, means that it’s worth consideration as your data format of choice.

Until now we’ve focused on row-based file formats, which aren’t always the best way to lay out data. In the next section you’ll learn about the advantages of columnar storage and see examples of Parquet, a columnar storage, in action.

3.4. Columnar storage

When data is written to an I/O device (say a flat file, or a table in a relational database), the most common way to lay out that data is row-based, meaning that all the fields for the first row are written first, followed by all the fields for the second row, and so on. This is how most relational databases write out tables by default, and the same goes for most data serialization formats such as XML, JSON, and Avro container files.

Columnar storage works differently—it lays out data by column first, and then by row. All the values of the first field across all the records are written first, followed by the second field, and so on. Figure 3.12 highlights the differences between the two storage schemes in how the data is laid out.

Figure 3.12. How row and column storage systems lay out their data

There are two main benefits to storing data in columnar form:

· Systems that read columnar data can efficiently extract a subset of the columns, reducing I/O. Row-based systems typically need to read the entire row even if just one or two columns are needed.

· Optimizations can be made when writing columnar data, such as run-length encoding and bit packing, to efficiently compress the size of the data being written. General compression schemes also work well for compressing columnar data because compression works best on data that has a lot of repeating data, which is the case when columnar data is physically colocated.

As a result, columnar file formats work best when working with large datasets where you wish to filter or project data, which is exactly the type of work that’s commonly performed in OLAP-type use cases, as well as MapReduce.

The majority of data formats used in Hadoop, such as JSON and Avro, are row-ordered, which means that you can’t apply the previously mentioned optimizations when reading and writing these files. Imagine that the data in figure 3.12 was in a Hive table and you were to execute the following query:

SELECT AVG(price) FROM stocks;

If the data was laid out in a row-based format, each row would have to be read, even though the only column being operated on is price. In a column-oriented store, only the price column would be read, which could result in drastically reduced processing times when you’re working with large datasets.

There are a number of columnar storage options that can be used in Hadoop:

· RCFile was the first columnar format available in Hadoop; it came out of a collaboration between Facebook and academia in 2009.[39] RCFile is a basic columnar store that supports separate column storage and column compression. It can support projection during reads, but misses out on the more advanced techniques such as run-length encoding. As a result, Facebook has been moving away from RCFile to ORC file.[40]

39 Yongqiang He, et al., “RCFile: A Fast and Space-efficient Data Placement Structure in MapReduce-based Warehouse Systems,” ICDE Conference 2011: www.cse.ohio-state.edu/hpcs/WWW/HTML/publications/papers/TR-11-4.pdf.

40 Facebook Engineering Blog, “Scaling the Facebook data warehouse to 300 PB,” https://code.facebook.com/posts/229861827208629/scaling-the-facebook-data-warehouse-to-300-pb/.

· ORC file was created by Facebook and Hortonworks to address RCFile’s shortcomings, and its serialization optimizations have yielded smaller data sizes compared to RCFile.[41] It also uses indexes to enable predicate pushdowns to optimize queries so that a column that doesn’t match a filter predicate can be skipped. ORC file is also fully integrated with Hive’s type system and can support nested structures.

41 Owen O’Malley, “ORC File Introduction,” www.slideshare.net/oom65/orc-fileintro.

· Parquet is a collaboration between Twitter and Cloudera and employs many of the tricks that ORC file uses to generate compressed files.[42] Parquet is a language-independent format with a formal specification.

42 Features such as column stats and indexes are planned for the Parquet 2 release.

RCFile and ORC file were designed to support Hive as their primary usage, whereas Parquet is independent of any other Hadoop tool and tries to maximize compatibility with the Hadoop ecosystem. Table 3.2 shows how these columnar formats integrate with various tools and languages.

Table 3.2. Columnar storage formats supported in Hadoop

Format

Hadoop support

Supported object models

Supported programming languages

Advanced compression support

RCFile

MapReduce, Pig, Hive (0.4+), Impala

Thrift, Protocol Buffers[a]

Java

No

ORC file

MapReduce, Pig, Hive (0.11+)

None

Java

Yes

Parquet

MapReduce, Pig, Hive, Impala

Avro, Protocol Buffers, Thrift

Java, C++, Python

Yes

a Elephant Bird provides the ability to use Thrift and Protocol Buffers with RCFile.

For this section, I’ll focus on Parquet due to its compatibility with object models such as Avro.

3.4.1. Understanding object models and storage formats

Before we get started with the techniques, we’ll cover a few Parquet concepts that are important in understanding the interplay between Parquet and Avro (and Thrift and Protocol Buffers):

· Object models are in-memory representations of data. Parquet exposes a simple object model that’s supplied more as an example than anything else. Avro, Thrift, and Protocol Buffers are full-featured object models. An example is the Avro Stock class, which was generated by Avro to richly model the schema using Java POJOs.

· Storage formats are serialized representations of a data model. Parquet is a storage format that serializes data in columnar form. Avro, Thrift, and Protocol Buffers also have their own storage formats that serialize data in row-oriented formats.[43] Storage formats can be thought of as the at-rest representation of data.

43 Avro does have a columnar storage format called Trevni: http://avro.apache.org/docs/1.7.6/trevni/spec.html.

· Parquet object model converters are responsible for converting an object model to Parquet’s data types, and vice versa. Parquet is bundled with a number of converters to maximize the interoperability and adoption of Parquet.

Figure 3.13 shows how these concepts work in the context of Parquet.

Figure 3.13. Parquet storage format and object model converters

What’s unique about Parquet is that it has converters that allow it to support common object models such as Avro. Behind the scenes, the data is stored in Parquet binary form, but when you’re working with your data, you’re using your preferred object model, such as Avro objects. This gives you the best of both worlds: you can continue to use a rich object model such as Avro to interact with your data, and that data will be efficiently laid out on disk using Parquet.

Storage format interoperability

Storage formats generally aren’t interoperable. When you’re combining Avro and Parquet, you’re combining Avro’s object model and Parquet’s storage format. Therefore, if you have existing Avro data sitting in HDFS that was serialized using Avro’s storage format, you can’t read that data using Parquet’s storage format, as they are two very different ways of encoding data. The reverse is also true—Parquet can’t be read using the normal Avro methods (such as the AvroInputFormat in MapReduce); you must use Parquet implementations of input formats and Hive SerDes to work with Parquet data.

To summarize, choose Parquet if you want your data to be serialized in a columnar form. Once you’ve selected Parquet, you’ll need to decide which object model you’ll be working with. I recommend you pick the object model that has the most traction in your organization. Otherwise I recommend going with Avro (section 3.3.5 explains why Avro can be a good choice).

The Parquet file format

The Parquet file format is beyond the scope of this book; for more details, take a look at the Parquet project page at https://github.com/Parquet/parquet-format.

3.4.2. Parquet and the Hadoop ecosystem

The goal of Parquet is to maximize support throughout the Hadoop ecosystem. It currently supports MapReduce, Hive, Pig, Impala, and Spark, and hopefully we’ll see it being supported by other systems such as Sqoop.

Because Parquet is a standard file format, a Parquet file that’s written by any one of these technologies can also be read by the others. Maximizing support across the Hadoop ecosystem is critical to the success of a file format, and Parquet is poised to become the ubiquitous file format in big data.

It’s also reassuring that Parquet isn’t focused on a particular subset of technologies—in the words of the Parquet home page, “We are not interested in playing favorites” when it comes to ecosystem support (http://parquet.io). This implies that a primary goal of the project is to maximize its support for the tools that you’re likely to use, which is important as new tools continue to pop up on our radars.

3.4.3. Parquet block and page sizes

Figure 3.14 shows a high-level representation of the Parquet file format and highlights the key concepts.

Figure 3.14. Parquet’s file format

A more detailed overview of the file format can be seen at the project’s home page: https://github.com/Parquet/parquet-format.

Technique 20 Reading Parquet files via the command line

Parquet is a binary storage format, so using the standard hadoop fs -cat command will yield garbage on the command line. In this technique we’ll explore how you can use the command line to not only view the contents of a Parquet file, but also to examine the schema and additional metadata contained in Parquet files.

Problem

You want to use the command line to examine the contents of a Parquet file.

Solution

Use the utilities bundled with the Parquet tools.

Discussion

Parquet is bundled with a tools JAR containing some useful utilities that can dump information in Parquet files to standard output.

Before you get started, you’ll need to create a Parquet file so that you can test out the tools. The following example creates a Parquet file by writing Avro records:

$ hip hip.ch3.parquet.ParquetAvroStockWriter \

--input test-data/stocks.txt \

--output stocks.parquet

The first Parquet tool you’ll use is cat, which performs a simple dump of the data in the Parquet file to standard output:

$ hip --nolib parquet.tools.Main cat stocks.parquet

symbol = AAPL

date = 2009-01-02

open = 85.88

...

You can use the Parquet head command instead of cat in the preceding example to emit only the first five records. There’s also a dump command that allows you to specify a subset of the columns that should be dumped, although the output isn’t as human-readable.

Parquet has its own internal data types and schema that are mapped to external object models by converters. The internal Parquet schema can be viewed using the schema option:

$ hip --nolib parquet.tools.Main schema stocks.parquet

message hip.ch3.avro.gen.Stock {

required binary symbol (UTF8);

required binary date (UTF8);

required double open;

required double high;

required double low;

required double close;

required int32 volume;

required double adjClose;

}

Parquet also allows object models to use the metadata to store information needed for deserialization. Avro, for example, uses the metadata to store the Avro schema, as can be seen in the output of the command that follows:

$ hip --nolib parquet.tools.Main meta stocks.parquet

creator: parquet-mr (build 3f25ad97f20...)

extra: avro.schema = {"type":"record","name":"Stock","namespace" ...

file schema: hip.ch3.avro.gen.Stock

---------------------------------------------------------------------

symbol: REQUIRED BINARY O:UTF8 R:0 D:0

date: REQUIRED BINARY O:UTF8 R:0 D:0

open: REQUIRED DOUBLE R:0 D:0

high: REQUIRED DOUBLE R:0 D:0

low: REQUIRED DOUBLE R:0 D:0

close: REQUIRED DOUBLE R:0 D:0

volume: REQUIRED INT32 R:0 D:0

adjClose: REQUIRED DOUBLE R:0 D:0

row group 1: RC:45 TS:2376

---------------------------------------------------------------------

symbol: BINARY SNAPPY DO:0 FPO:4 SZ:85/84/0.99 VC:45 ENC:PD ...

date: BINARY SNAPPY DO:0 FPO:89 SZ:127/198/1.56 VC:45 ENC ...

open: DOUBLE SNAPPY DO:0 FPO:216 SZ:301/379/1.26 VC:45 EN ...

high: DOUBLE SNAPPY DO:0 FPO:517 SZ:297/379/1.28 VC:45 EN ...

low: DOUBLE SNAPPY DO:0 FPO:814 SZ:292/379/1.30 VC:45 EN ...

close: DOUBLE SNAPPY DO:0 FPO:1106 SZ:299/379/1.27 VC:45 E ...

volume: INT32 SNAPPY DO:0 FPO:1405 SZ:203/199/0.98 VC:45 EN ...

adjClose: DOUBLE SNAPPY DO:0 FPO:1608 SZ:298/379/1.27 VC:45 E ...

Next let’s look at how you can write and read Parquet files.

Technique 21 Reading and writing Avro data in Parquet with Java

One of the first things you’ll want to do when working with a new file format is to understand how a standalone Java application can read and write data. This technique shows how you can write Avro data into a Parquet file and read it back out.

Problem

You want to read and write Parquet data directly from your Java code outside of Hadoop using an Avro object model.

Solution

Use the AvroParquetWriter and AvroParquetReader classes.

Discussion

Parquet, a columnar storage format for Hadoop, has support for Avro, which allows you to work with your data using Avro classes, and to efficiently encode the data using Parquet’s file format so that you can take advantage of the columnar layout of your data. It sounds odd to mix data formats like this, so let’s investigate why you’d want to do this and how it works.

Parquet is a storage format, and it has a formal programming language–agnostic specification. You could use Parquet directly without any other supporting data format such as Avro, but Parquet is at heart a simple data format and doesn’t support complex types such as maps or unions. This is where Avro comes into play, as it supports these richer types as well as features such as code generation and schema evolution. As a result, marrying Parquet and a rich data format such as Avro creates a perfect match of sophisticated schema capabilities coupled with efficient data encoding.

For this technique, we’ll continue to use the Avro Stock schema. First, let’s look at how you can write a Parquet file using these Stock objects.[44]

44 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/parquet/ParquetAvroStockWriter.java.

The following command generates a Parquet file by executing the preceding code:

$ hip hip.ch3.parquet.ParquetAvroStockWriter \

--input test-data/stocks.txt \

--output stocks.parquet

The previous technique showed you how to use the Parquet tools to dump the file to standard output. But what if you wanted to read the file in Java?[45]

45 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/parquet/ParquetAvroStockReader.java.

ParquetReader<Stock> reader = new AvroParquetReader<Stock>(inputFile);

Stock stock;

while((stock = reader.read()) != null) {

System.out.println(stock);

}

reader.close()

The following command shows the output of the preceding code:

$ hip hip.ch3.parquet.ParquetAvroStockReader \

--input stocks.parquet

AAPL,2009-01-02,85.88,91.04,85.16,90.75,26643400,90.75

AAPL,2008-01-02,199.27,200.26,192.55,194.84,38542100,194.84

AAPL,2007-01-03,86.29,86.58,81.9,83.8,44225700,83.8

...

Technique 22 Parquet and MapReduce

This technique examines how you can work with Parquet files in MapReduce. Using Parquet as a data source as well as a data sink in MapReduce will be covered.

Problem

You want to work with Avro data serialized as Parquet in MapReduce.

Solution

Use the AvroParquetInputFormat and AvroParquetOutputFormat classes.

Discussion

The Avro subproject in Parquet comes with MapReduce input and output formats to let you read and write your Avro data using Parquet as the storage format. The following example calculates the average stock price for each symbol:[46]

46 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/parquet/AvroParquetMapReduce.java.

Working with Avro in Parquet is very simple, and arguably easier than working with Avro-serialized data.[47] You can run the example:

47 The input and output formats supplied with Avro to support Avro’s storage format wrap the Avro objects, requiring a level of indirection.

$ hip hip.ch3.parquet.AvroParquetMapReduce \

--input stocks.parquet \

--output output

Parquet comes with some tools to help you work with Parquet files, and one of them allows you to dump the contents to standard output:

$ hdfs -ls output

output/_SUCCESS

output/_metadata

output/part-r-00000.parquet

$ hip --nolib parquet.tools.Main cat output/part-r-00000.parquet

symbol = AAPL

avg = 68.631

symbol = CSCO

avg = 31.148000000000003

symbol = GOOG

avg = 417.47799999999995

symbol = MSFT

avg = 44.63100000000001

symbol = YHOO

avg = 69.333

You may have noticed that there’s an additional file in the output directory called _metadata. When the Parquet OutputComitter runs upon job completion, it reads the footer of all the output files (which contains the file metadata) and generates this summarized metadata file. This file is used by subsequent MapReduce (or Pig/Hive) jobs to reduce job startup times.[48]

48 Calculating the input splits can take a long time when there are a large number of input files that need to have their footers read, so having the ability to read a single summary file is a useful optimization.

Summary

In this technique, you saw how to use code-generated Avro object files with Parquet. If you don’t want to work with Avro object files, you have a few options that allow you to work with Avro data generically using Avro’s GenericData class:

· If you wrote the Avro data using GenericData objects, then that’s the format in which Avro will supply them to your mappers.

· Excluding the JAR containing your Avro-generated code will also result in GenericData objects being fed to your mappers.

· You can trick Avro by mutating the input schema so that Avro can’t load the specific class, forcing it to supply the GenericData instance instead.

The following code shows how you would perform the third option—you’re essentially taking the original schema and duplicating it, but in the process you’re supplying a different classname, which Avro won’t be able to load (see "foobar" in the first line):[49]

49 GitHub source https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/parquet/AvroGenericParquetMapReduce.java.

Schema schema = Schema.createRecord("foobar",

Stock.SCHEMA$.getDoc(), Stock.SCHEMA$.getNamespace(), false);

List<Schema.Field> fields = Lists.newArrayList();

for (Schema.Field field : Stock.SCHEMA$.getFields()) {

fields.add(new Schema.Field(field.name(), field.schema(),

field.doc(), field.defaultValue(), field.order()));

}

schema.setFields(fields);

AvroParquetInputFormat.setAvroReadSchema(job, schema);

What if you want to work with the Parquet data natively? Parquet comes with an example object model that allows you to work with any Parquet data, irrespective of the object model that was used to write the data. It uses a Group class to represent records, and provides some basic getters and setters to retrieve fields.

The following code once again shows how to calculate the stock averages. The input is the Avro/Parquet data, and the output is a brand new Parquet schema:[50]

50 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/parquet/ExampleParquetMapReduce.java.

private final static String writeSchema = "message stockavg {\n" +

"required binary symbol;\n" +

"required double avg;\n" +

"}";

public void run(Path inputPath, Path outputPath) {

Configuration conf = super.getConf();

Job job = new Job(conf);

job.setJarByClass(ExampleParquetMapReduce.class);

job.setInputFormatClass(ExampleInputFormat.class);

FileInputFormat.setInputPaths(job, inputPath);

job.setMapperClass(Map.class);

job.setReducerClass(Reduce.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(DoubleWritable.class);

job.setOutputFormatClass(ExampleOutputFormat.class);

FileOutputFormat.setOutputPath(job, outputPath);

ExampleOutputFormat.setSchema(

job,

MessageTypeParser.parseMessageType(writeSchema));

}

public static class Map extends Mapper<Void, Group,

Text, DoubleWritable> {

@Override

public void map(Void key, Group value, Context context) {

context.write(new Text(value.getString("symbol", 0)),

new DoubleWritable(Double.valueOf(

value.getValueToString(2, 0))));

}

}

public static class Reduce extends Reducer<Text, DoubleWritable,

Void, Group> {

private SimpleGroupFactory factory;

@Override

protected void setup(Context context) {

factory = new SimpleGroupFactory(

GroupWriteSupport.getSchema(

ContextUtil.getConfiguration(context)));

}

@Override

protected void reduce(Text key, Iterable<DoubleWritable> values,

Context context) {

Mean mean = new Mean();

for (DoubleWritable val : values) {

mean.increment(val.get());

}

Group group = factory.newGroup()

.append("symbol", key.toString())

.append("avg", mean.getResult());

context.write(null, group);

}

}

The example object model is pretty basic and is currently missing some functionality—for example, there are no getters for double types, which is why the preceding code accesses the stock value using the getValueToString method. But there’s work afoot to provide better object models, including a POJO adapter.[51]

51 See the GitHub ticket number 325 titled “Pojo Support for Parquet” at https://github.com/Parquet/parquet-mr/pull/325.

Technique 23 Parquet and Hive/Impala

Parquet comes into its own when utilized in Hive and Impala. Columnar storage is a natural fit for these systems by virtue of its ability to use pushdowns to optimize the read path.[52] This technique shows how Parquet can be used in these systems.

52 Pushdowns are covered in more detail in the next technique.

Problem

You want to be able to work with your Parquet data in Hive and Impala.

Solution

Use Hive’s and Impala’s built-in support for Parquet.

Discussion

Hive requires that data exists in a directory, so you first need to create a directory and copy the stocks Parquet file into it:

$ hadoop fs -mkdir parquet_avro_stocks

$ hadoop fs -cp stocks.parquet parquet_avro_stocks

Next, you’ll create an external Hive table and define the schema. If you’re unsure about the structure of your schema, use one of the earlier techniques to view the schema information in the Parquet files that you’re working with (use the schema command in the Parquet tools):

hive> CREATE EXTERNAL TABLE parquet_stocks(

symbol string,

date string,

open double,

high double,

low double,

close double,

volume int,

adjClose double

) STORED AS PARQUET

LOCATION '/user/YOUR_USERNAME/parquet_avro_stocks';

Hive 0.13

Support for Parquet as a native Hive store was only added in Hive 0.13 (see https://issues.apache.org/jira/browse/HIVE-5783). If you’re using an older version of Hive, you’ll need to manually load all the Parquet JARs using the ADD JAR command and use the Parquet input and output formats. Cloudera has an example on its blog; see “How-to: Use Parquet with Impala, Hive, Pig, and Map-Reduce,” http://blog.cloudera.com/blog/2014/03/how-to-use-parquet-with-impala-hive-pig-mapreduce/.

You can run a simple query to extract the unique stock symbols from the data:

hive> select distinct(symbol) from parquet_stocks;

AAPL

CSCO

GOOG

MSFT

YHOO

You can use the same syntax to create the table in Impala.

Technique 24 Pushdown predicates and projection with Parquet

Projection and predicate pushdowns involve an execution engine pushing the projection and predicates down to the storage format to optimize the operations at the lowest level possible. This yields space and time advantages, as columns that aren’t required for the query don’t need to be fetched and supplied to the execution engine.

This is especially useful for columnar stores, as pushdowns allow the storage format to skip over entire column groups that aren’t required for the query, and columnar formats can perform this operation very efficiently.

In this technique you’ll look at the steps required to use these pushdowns in your Hadoop pipelines.

Problem

You want to use pushdowns in Hadoop to optimize your jobs.

Solution

Using Hive and Pig in conjunction with Parquet provides out-of-the-box projection pushdowns. With MapReduce there are some manual steps that you need to take in the driver code to enable pushdowns.

Discussion

Once again our focus with this technique is Avro. The AvroParquetInputFormat has two methods that you can use for predicate and projection pushdowns. In the following example, only two fields of the Stock object are projected, and a predicate is added so that only Google stocks are selected:[53]

53 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/parquet/AvroProjectionParquetMapReduce.java.

Predicate filter null values

When the predicate that you supply filters out a record, a null value is supplied to your mapper. That’s why you have to check for null before working with the mapper input.

If you run the job and examine the output, you’ll only find the average for the Google stock, demonstrating that the predicate worked:

$ hip hip.ch3.parquet.AvroProjectionParquetMapReduce \

--input stocks.parquet \

--output output

$ hip --nolib parquet.tools.Main cat output/part-r-00000.parquet

symbol = GOOG

avg = 417.47799999999995

Summary

This technique doesn’t include any Hive or Pig pushdown details, as both tools automatically perform these pushdowns as part of their execution. Pushdowns are an important part of your job-optimization work, and if you’re using a third-party library or tool that doesn’t expose pushdowns when working with Parquet, you can help the community by opening a feature request.

3.4.4. Parquet limitations

There are a number of points that you should be aware of when working with Parquet:

· Parquet requires a lot of memory when writing files because it buffers writes in memory to optimize the encoding and compressing of the data. Either increase the heap size (2 GB is recommended), or decrease the parquet.block.size configurable if you encounter memory issues when writing Parquet files.

· Using a heavily nested data structure with Parquet will likely limit some of the optimizations that Parquet makes for pushdowns. If possible, try to flatten your schema.

· Hive doesn’t yet support decimal and timestamp data types when working with Parquet because Parquet doesn’t support them as native types. Work is being tracked in a JIRA ticket titled “Implement all Hive data types in Parquet” (https://issues.apache.org/jira/browse/HIVE-6384).

· Impala doesn’t support nested data in Parquet or complex data types such as maps, structs, or arrays. This should be fixed in the Impala 2.x release.

· Tools such as Impala work best when a Parquet file contains a single row group and when the entire file fits inside an HDFS block. In reality, it’s hard to achieve this goal when you’re writing Parquet files in systems such as MapReduce, but it’s good to keep this in mind as you’re producing Parquet files.

We’ve covered working with common file formats and working with various data serialization tools for tighter compatibility with MapReduce. It’s time to look at how you can support file formats that may be proprietary to your organization, or even public file formats for which no input or output formats exist for MapReduce.

3.5. Custom file formats

In any organization you’ll typically find a plethora of custom or uncommon file formats that litter its datacenters. There may be back-end servers dumping out audit files in a proprietary format, or old code or systems that write files using formats that aren’t in common use any longer. If you want to work with such data in MapReduce, you’ll need to write your own input and output format classes to work with your data. This section will walk you through that process.

3.5.1. Input and output formats

At the start of this chapter, we took a high-level look at the functions of input and output format classes in MapReduce. Input and output classes are required to feed data to map functions and to write the outputs of reduce functions.

Technique 25 Writing input and output formats for CSV

Imagine you have a bunch of data sitting around in CSV files and you’re writing multiple MapReduce jobs that read and write data in CSV form. Because CSV is text, you could use the built-in TextInputFormat and TextOutputFormat, and handle parsing the CSV in your MapReduce code. This can quickly get tiring, however, and result in the same parsing code being copied and pasted across all of your jobs.

If you thought MapReduce had any built-in CSV input and output formats that could take care of this parsing, you’d be out of luck—there are none.

Problem

You want to work with CSV in MapReduce and have CSV records presented to you in a richer format than you’d get if you were using a TextInputFormat that would supply a string representing a line.

Solution

Write an input and output format that works with CSV.

Discussion

We’ll cover all of the steps required to write your own format classes to work with CSV input and output. CSV is one of the simpler file formats to work with, which will make it easier to focus on MapReduce format specifics without having to think too much about the file format.

Your custom InputFormat and RecordReader classes will parse CSV files and supply the data to the mapper in a user-friendly format. You’ll also support a custom field separator for non-comma delimiters. Because you don’t want to reinvent the wheel, you’ll use the CSV parser in the open source OpenCSV project (http://opencsv.sourceforge.net/), which will take care of quoted fields and ignore separator characters in quoted fields.

Overview of InputFormat andOutputFormat

I provided a detailed overview of InputFormat and OutputFormat and their related classes at the start of this chapter. It may be worth looking back at that discussion prior to looking at the code in this technique.

The InputFormat

Your first step is to define the InputFormat. The function of InputFormat is to validate the set of inputs supplied to the job, identify input splits, and create a RecordReader class to read input from the sources. The following code reads the separator (if supplied) from the job configuration and constructs a CSVRecordReader:[54]

54 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/csv/CSVInputFormat.java.

InputFormat and compressed files

In the preceding code, you saw that when the was compressed, a flag was returned to indicate that it couldn’t be split. The reason for doing this is that compression codecs aren’t splittable, apart from LZOP. But splittable LZOP can’t work with regular InputFormat classes—it needs special-case LZOP InputFormat classes. These details are covered in chapter 4.

Your InputFormat class is complete. You extended the FileInputFormat class, which contains code that calculates input splits along HDFS block boundaries, keeping you from having to handle calculating the input splits yourself. The FileInputFormat manages all of the input files and splits for you. Now let’s move on to the RecordReader, which will require a little more effort.

RecordReader performs two main functions. It must first open the input source based on the input split supplied, and it optionally seeks into a specific offset in that input split. The second function of the RecordReader is to read individual records from the input source.

In this example, a logical record equates to a line in the CSV file, so you’ll use the existing LineRecordReader class in MapReduce to handle working with the file. When the RecordReader is initialized with the InputSplit, it will open the input file, seek to the start of the input split, and keep reading characters until it reaches the start of the next record, which in the case of a line means a newline. The following code shows a simplified version of the LineRecordReader.initialize method:

The LineRecordReader returns key/value pairs for each line in LongWritable/Text form. Because you’ll want to provide some functionality in the Record Reader, you need to encapsulate the LineRecordReader within your class. The RecordReader needs to supply a key/value pair representation of the record to the mapper, and in this case the key is the byte offset in the file, and the value is an array containing the tokenized parts of the CSV line:[55]

55 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/csv/CSVInputFormat.java.

Next you need to provide methods to read the next record and to get at the key and value for that record:[56]

56 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/csv/CSVInputFormat.java.

At this point, you’ve created an InputFormat and a RecordReader that both can work with CSV files. Now that you’ve completed the InputFormat, it’s time to move on to the OutputFormat.

OutputFormat

OutputFormat classes follow a pattern similar to InputFormat classes; the OutputFormat class handles the logistics around creating the output stream and then delegates the stream writes to the RecordWriter.

The CSVOutputFormat indirectly extends the FileOutputFormat class (via TextOutputFormat), which handles all of the logistics related to creating the output filename, creating an instance of a compression codec (if compression was enabled), and output committing, which we’ll discuss shortly.

That leaves the OutputFormat class with the tasks of supporting a custom field delimiter for your CSV output file, and of creating a compressed OutputStream if required. It must also return your CSVRecordWriter, which will write CSV lines to the output stream:[57]

57 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/csv/CSVOutputFormat.java.

In the following code, your RecordWriter writes each record emitted by the reducer to the output destination. You require that the reducer output key be in array form representing each token in the CSV line, and you specify that the reducer output value must be a NullWritable, which means that you don’t care about the value part of the output.

Let’s take a look at the CSVRecordWriter class. The constructor, which only sets the field separator and the output stream, is excluded, as shown in the following listing.[58]

58 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/csv/CSVOutputFormat.java.

Listing 3.6. A RecordWriter that produces MapReduce output in CSV form

Now you need to apply the new input and output format classes in a MapReduce job.

MapReduce

Your MapReduce job will take CSV as input, and it’ll produce CSV that’s separated by colons, not commas. The job will perform identity map and reduce functions, which means that you won’t be changing the data as it passes through MapReduce. Your input file will be delimited with the tab character, and your output file will be comma-separated. Your input and output format classes will support the notion of custom delimiters via Hadoop configuration properties.

The MapReduce code is as follows:[59]

59 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/csv/CSVMapReduce.java.

The map and reduce functions don’t do much other than echo their inputs to output, but include them so you can see how to work with the CSV in your MapReduce code:[60]

60 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/csv/CSVMapReduce.java.

If you run this example MapReduce job against a tab-delimited file, you can examine the mapper output and see if the results are as expected:

$ hadoop fs -put test-data/stocks.txt stocks.txt

$ hip hip.ch3.csv.CSVMapReduce \

--input stocks.txt \

--output output

$ hadoop fs -cat output/part*

AAPL:2009-01-02:85.88:91.04:85.16:90.75:26643400:90.75

AAPL:2008-01-02:199.27:200.26:192.55:194.84:38542100:194.84

AAPL:2007-01-03:86.29:86.58:81.90:83.80:44225700:83.80

...

You now have a functional InputFormat and OutputFormat that can consume and produce CSV in MapReduce.

Pig

Pig’s piggybank library contains a CSVLoader that can be used to load CSV files into tuples. It supports double-quoted fields in the CSV records and provides each item as a byte array.

There’s a GitHub project called csv-serde (https://github.com/ogrodnek/csv-serde), which has a Hive SerDe that can both serialize and deserialize CSV. Like the previous InputFormat example, it also uses the OpenCSV project for reading and writing CSV.

Summary

This technique demonstrated how you can write your own MapReduce format classes to work with text-based data. Work is currently underway in MapReduce to add a CSV input format (see https://issues.apache.org/jira/browse/MAPREDUCE-2208).

Arguably, it would have been simpler to use the TextInputFormat and split the line in the mapper. But if you need to do this multiple times, you’re likely suffering from the copy-paste antipattern, because the same code to tokenize the CSV likely exists in multiple locations. If the code is written with code reuse in mind, you’ll be covered.

We’ve looked at how you can write your own I/O format classes to work with a custom file format in MapReduce. Now we need to look at a crucial aspect of working with output formats—output committing.

3.5.2. The importance of output committing

In the CSV OutputFormat example in the previous technique, you extended FileOutputFormat, which takes care of committing output after the task has succeeded. Why do you need commits in MapReduce, and why should you care?

As a job and its tasks are executing, they will start writing job output at some point. Tasks and jobs can fail, they can be restarted, and they can also be speculatively executed.[61] To allow OutputFormats to correctly handle these scenarios, MapReduce has the notion of anOutputCommitter, which is a mechanism by which MapReduce invokes a callback when an individual task as well as the overall job have completed.

61 Speculative executing is when MapReduce executes multiple tasks for the same input data to guard against slow or misbehaving nodes slowing down the overall job. By default, both map-side and reduce-side speculative execution is enabled. The mapred.map.tasks.speculative.execution and mapred.reduce.tasks.speculative.execution control this behavior.

Most OutputFormats in MapReduce use FileOutputFormat, which uses FileOutput-Committer for its output committing. When the FileOutputFormat is initially consulted about the location of the output files, it delegates the decision of where the output should be located to the FileOutputCommitter, which in turn specifies that the output should go to a temporary directory under the job output directory (<job-output>/_temporary/<task-attempt-id>). Only after the overall task has completed will the FileOutputCommitter be notified, at which point the temporary output is moved to the job output directory. When the overall job has successfully completed, the FileOutput-Committer is again notified, and this time it touches a _SUCCESS file in the job output directory to help downstream processors know the job succeeded.

This is great if your data sink is HDFS, where you can use FileOutputFormat and its committing mechanism. Things start to get trickier when you’re working with data sources other than files, such as a database. If, in such cases, idempotent writes (where the same operation can be applied multiple times without changing the result) are necessary, you’ll need to factor that into the design of your destination data store or OutputFormat.

This topic is examined in more detail in chapter 5, which covers exporting data from Hadoop to databases.

3.6. Chapter summary

The goal for this chapter was to show you how to work with common file formats such as XML and JSON in MapReduce. We also looked at more sophisticated file formats such as SequenceFile, Avro, and Parquet, which provide useful features for working with big data, such as versioning, compression, and complex data structures. We also walked through the process of working with custom file formats to ensure they’ll work in MapReduce.

At this point, you’re equipped to work with any file format in MapReduce. Now it’s time to look at some storage patterns so you can effectively work with your data and optimize storage and disk/network I/O.