Avro - Related Projects - Hadoop: The Definitive Guide (2015)

Hadoop: The Definitive Guide (2015)

Part IV. Related Projects

Chapter 12. Avro

Apache Avro[79] is a language-neutral data serialization system. The project was created by Doug Cutting (the creator of Hadoop) to address the major downside of Hadoop Writables: lack of language portability. Having a data format that can be processed by many languages (currently C, C++, C#, Java, JavaScript, Perl, PHP, Python, and Ruby) makes it easier to share datasets with a wider audience than one tied to a single language. It is also more future-proof, allowing data to potentially outlive the language used to read and write it.

But why a new data serialization system? Avro has a set of features that, taken together, differentiate it from other systems such as Apache Thrift or Google’s Protocol Buffers.[80] Like in these systems and others, Avro data is described using a language-independent schema. However, unlike in some other systems, code generation is optional in Avro, which means you can read and write data that conforms to a given schema even if your code has not seen that particular schema before. To achieve this, Avro assumes that the schema is always present — at both read and write time — which makes for a very compact encoding, since encoded values do not need to be tagged with a field identifier.

Avro schemas are usually written in JSON, and data is usually encoded using a binary format, but there are other options, too. There is a higher-level language called Avro IDL for writing schemas in a C-like language that is more familiar to developers. There is also a JSON-based data encoder, which, being human readable, is useful for prototyping and debugging Avro data.

The Avro specification precisely defines the binary format that all implementations must support. It also specifies many of the other features of Avro that implementations should support. One area that the specification does not rule on, however, is APIs: implementations have complete latitude in the APIs they expose for working with Avro data, since each one is necessarily language specific. The fact that there is only one binary format is significant, because it means the barrier for implementing a new language binding is lower and avoids the problem of a combinatorial explosion of languages and formats, which would harm interoperability.

Avro has rich schema resolution capabilities. Within certain carefully defined constraints, the schema used to read data need not be identical to the schema that was used to write the data. This is the mechanism by which Avro supports schema evolution. For example, a new, optional field may be added to a record by declaring it in the schema used to read the old data. New and old clients alike will be able to read the old data, while new clients can write new data that uses the new field. Conversely, if an old client sees newly encoded data, it will gracefully ignore the new field and carry on processing as it would have done with old data.

Avro specifies an object container format for sequences of objects, similar to Hadoop’s sequence file. An Avro datafile has a metadata section where the schema is stored, which makes the file self-describing. Avro datafiles support compression and are splittable, which is crucial for a MapReduce data input format. In fact, support goes beyond MapReduce: all of the data processing frameworks in this book (Pig, Hive, Crunch, Spark) can read and write Avro datafiles.

Avro can be used for RPC, too, although this isn’t covered here. More information is in the specification.

Avro Data Types and Schemas

Avro defines a small number of primitive data types, which can be used to build application-specific data structures by writing schemas. For interoperability, implementations must support all Avro types.

Avro’s primitive types are listed in Table 12-1. Each primitive type may also be specified using a more verbose form by using the type attribute, such as:

{ "type": "null" }

Table 12-1. Avro primitive types

Type

Description

Schema

null

The absence of a value

"null"

boolean

A binary value

"boolean"

int

32-bit signed integer

"int"

long

64-bit signed integer

"long"

float

Single-precision (32-bit) IEEE 754 floating-point number

"float"

double

Double-precision (64-bit) IEEE 754 floating-point number

"double"

bytes

Sequence of 8-bit unsigned bytes

"bytes"

string

Sequence of Unicode characters

"string"

Avro also defines the complex types listed in Table 12-2, along with a representative example of a schema of each type.

Table 12-2. Avro complex types

Type

Description

Schema example

array

An ordered collection of objects. All objects in a particular array must have the same schema.

{

"type": "array",

"items": "long"

}

map

An unordered collection of key-value pairs. Keys must be strings and values may be any type, although within a particular map, all values must have the same schema.

{

"type": "map",

"values": "string"

}

record

A collection of named fields of any type.

{

"type": "record",

"name": "WeatherRecord",

"doc": "A weather reading.",

"fields": [

{"name": "year", "type": "int"},

{"name": "temperature", "type": "int"},

{"name": "stationId", "type": "string"}

]

}

enum

A set of named values.

{

"type": "enum",

"name": "Cutlery",

"doc": "An eating utensil.",

"symbols": ["KNIFE", "FORK", "SPOON"]

}

fixed

A fixed number of 8-bit unsigned bytes.

{

"type": "fixed",

"name": "Md5Hash",

"size": 16

}

union

A union of schemas. A union is represented by a JSON array, where each element in the array is a schema. Data represented by a union must match one of the schemas in the union.

[

"null",

"string",

{"type": "map", "values": "string"}

]

Each Avro language API has a representation for each Avro type that is specific to the language. For example, Avro’s double type is represented in C, C++, and Java by a double, in Python by a float, and in Ruby by a Float.

What’s more, there may be more than one representation, or mapping, for a language. All languages support a dynamic mapping, which can be used even when the schema is not known ahead of runtime. Java calls this the Generic mapping.

In addition, the Java and C++ implementations can generate code to represent the data for an Avro schema. Code generation, which is called the Specific mapping in Java, is an optimization that is useful when you have a copy of the schema before you read or write data. Generated classes also provide a more domain-oriented API for user code than Generic ones.

Java has a third mapping, the Reflect mapping, which maps Avro types onto preexisting Java types using reflection. It is slower than the Generic and Specific mappings but can be a convenient way of defining a type, since Avro can infer a schema automatically.

Java’s type mappings are shown in Table 12-3. As the table shows, the Specific mapping is the same as the Generic one unless otherwise noted (and the Reflect one is the same as the Specific one unless noted). The Specific mapping differs from the Generic one only for record, enum, and fixed, all of which have generated classes (the names of which are controlled by the name and optional namespace attributes).

Table 12-3. Avro Java type mappings

Avro type

Generic Java mapping

Specific Java mapping

Reflect Java mapping

null

null type

boolean

boolean

int

int

byte, short, int, or char

long

long

float

float

double

double

bytes

java.nio.ByteBuffer

Array of bytes

string

org.apache.avro.util.Utf8 or java.lang.String

java.lang.String

array

org.apache.avro.generic.GenericArray

Array or java.util.Collection

map

java.util.Map

record

org.apache.avro.generic.GenericRecord

Generated class implementing org.apache.avro.specific.SpecificRecord

Arbitrary user class with a zero-argument constructor; all inherited nontransient instance fields are used

enum

java.lang.String

Generated Java enum

Arbitrary Java enum

fixed

org.apache.avro. generic.GenericFixed

Generated class implementing org.apache.avro.specific.SpecificFixed

org.apache.avro.generic.GenericFixed

union

java.lang.Object

NOTE

Avro string can be represented by either Java String or the Avro Utf8 Java type. The reason to use Utf8 is efficiency: because it is mutable, a single Utf8 instance may be reused for reading or writing a series of values. Also, Java String decodes UTF-8 at object construction time, whereas Avro Utf8 does it lazily, which can increase performance in some cases.

Utf8 implements Java’s java.lang.CharSequence interface, which allows some interoperability with Java libraries. In other cases, it may be necessary to convert Utf8 instances to String objects by calling its toString() method.

Utf8 is the default for Generic and Specific, but it’s possible to use String for a particular mapping. There are a couple of ways to achieve this. The first is to set the avro.java.string property in the schema to String:

{ "type": "string", "avro.java.string": "String" }

Alternatively, for the Specific mapping, you can generate classes that have String-based getters and setters. When using the Avro Maven plug-in, this is done by setting the configuration property stringType to String (The Specific API has a demonstration of this).

Finally, note that the Java Reflect mapping always uses String objects, since it is designed for Java compatibility, not performance.

In-Memory Serialization and Deserialization

Avro provides APIs for serialization and deserialization that are useful when you want to integrate Avro with an existing system, such as a messaging system where the framing format is already defined. In other cases, consider using Avro’s datafile format.

Let’s write a Java program to read and write Avro data from and to streams. We’ll start with a simple Avro schema for representing a pair of strings as a record:

{

"type": "record",

"name": "StringPair",

"doc": "A pair of strings.",

"fields": [

{"name": "left", "type": "string"},

{"name": "right", "type": "string"}

]

}

If this schema is saved in a file on the classpath called StringPair.avsc (.avsc is the conventional extension for an Avro schema), we can load it using the following two lines of code:

Schema.Parser parser = new Schema.Parser();

Schema schema = parser.parse(

getClass().getResourceAsStream("StringPair.avsc"));

We can create an instance of an Avro record using the Generic API as follows:

GenericRecord datum = new GenericData.Record(schema);

datum.put("left", "L");

datum.put("right", "R");

Next, we serialize the record to an output stream:

ByteArrayOutputStream out = new ByteArrayOutputStream();

DatumWriter<GenericRecord> writer =

new GenericDatumWriter<GenericRecord>(schema);

Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);

writer.write(datum, encoder);

encoder.flush();

out.close();

There are two important objects here: the DatumWriter and the Encoder. A DatumWriter translates data objects into the types understood by an Encoder, which the latter writes to the output stream. Here we are using a GenericDatumWriter, which passes the fields of GenericRecord to theEncoder. We pass a null to the encoder factory because we are not reusing a previously constructed encoder here.

In this example, only one object is written to the stream, but we could call write() with more objects before closing the stream if we wanted to.

The GenericDatumWriter needs to be passed the schema because it follows the schema to determine which values from the data objects to write out. After we have called the writer’s write() method, we flush the encoder, then close the output stream.

We can reverse the process and read the object back from the byte buffer:

DatumReader<GenericRecord> reader =

new GenericDatumReader<GenericRecord>(schema);

Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(),

null);

GenericRecord result = reader.read(null, decoder);

assertThat(result.get("left").toString(), is("L"));

assertThat(result.get("right").toString(), is("R"));

We pass null to the calls to binaryDecoder() and read() because we are not reusing objects here (the decoder or the record, respectively).

The objects returned by result.get("left") and result.get("left") are of type Utf8, so we convert them into Java String objects by calling their toString() methods.

The Specific API

Let’s look now at the equivalent code using the Specific API. We can generate the StringPair class from the schema file by using Avro’s Maven plug-in for compiling schemas. The following is the relevant part of the Maven Project Object Model (POM):

<project>

...

<build>

<plugins>

<plugin>

<groupId>org.apache.avro</groupId>

<artifactId>avro-maven-plugin</artifactId>

<version>${avro.version}</version>

<executions>

<execution>

<id>schemas</id>

<phase>generate-sources</phase>

<goals>

<goal>schema</goal>

</goals>

<configuration>

<includes>

<include>StringPair.avsc</include>

</includes>

<stringType>String</stringType>

<sourceDirectory>src/main/resources</sourceDirectory>

<outputDirectory>${project.build.directory}/generated-sources/java

</outputDirectory>

</configuration>

</execution>

</executions>

</plugin>

</plugins>

</build>

...

</project>

As an alternative to Maven, you can use Avro’s Ant task, org.apache.avro.specific.SchemaTask, or the Avro command-line tools[81] to generate Java code for a schema.

In the code for serializing and deserializing, instead of a GenericRecord we construct a StringPair instance, which we write to the stream using a SpecificDatumWriter and read back using a SpecificDatumReader:

StringPair datum = new StringPair();

datum.setLeft("L");

datum.setRight("R");

ByteArrayOutputStream out = new ByteArrayOutputStream();

DatumWriter<StringPair> writer =

new SpecificDatumWriter<StringPair>(StringPair.class);

Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);

writer.write(datum, encoder);

encoder.flush();

out.close();

DatumReader<StringPair> reader =

new SpecificDatumReader<StringPair>(StringPair.class);

Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(),

null);

StringPair result = reader.read(null, decoder);

assertThat(result.getLeft(), is("L"));

assertThat(result.getRight(), is("R"));

Avro Datafiles

Avro’s object container file format is for storing sequences of Avro objects. It is very similar in design to Hadoop’s sequence file format, described in SequenceFile. The main difference is that Avro datafiles are designed to be portable across languages, so, for example, you can write a file in Python and read it in C (we will do exactly this in the next section).

A datafile has a header containing metadata, including the Avro schema and a sync marker, followed by a series of (optionally compressed) blocks containing the serialized Avro objects. Blocks are separated by a sync marker that is unique to the file (the marker for a particular file is found in the header) and that permits rapid resynchronization with a block boundary after seeking to an arbitrary point in the file, such as an HDFS block boundary. Thus, Avro datafiles are splittable, which makes them amenable to efficient MapReduce processing.

Writing Avro objects to a datafile is similar to writing to a stream. We use a DatumWriter as before, but instead of using an Encoder, we create a DataFileWriter instance with the DatumWriter. Then we can create a new datafile (which, by convention, has a .avro extension) and append objects to it:

File file = new File("data.avro");

DatumWriter<GenericRecord> writer =

new GenericDatumWriter<GenericRecord>(schema);

DataFileWriter<GenericRecord> dataFileWriter =

new DataFileWriter<GenericRecord>(writer);

dataFileWriter.create(schema, file);

dataFileWriter.append(datum);

dataFileWriter.close();

The objects that we write to the datafile must conform to the file’s schema; otherwise, an exception will be thrown when we call append().

This example demonstrates writing to a local file (java.io.File in the previous snippet), but we can write to any java.io.OutputStream by using the overloaded create() method on DataFileWriter. To write a file to HDFS, for example, we get an OutputStream by calling create() onFileSystem (see Writing Data).

Reading back objects from a datafile is similar to the earlier case of reading objects from an in-memory stream, with one important difference: we don’t have to specify a schema, since it is read from the file metadata. Indeed, we can get the schema from the DataFileReader instance,using getSchema(), and verify that it is the same as the one we used to write the original object:

DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();

DataFileReader<GenericRecord> dataFileReader =

new DataFileReader<GenericRecord>(file, reader);

assertThat("Schema is the same", schema, is(dataFileReader.getSchema()));

DataFileReader is a regular Java iterator, so we can iterate through its data objects by calling its hasNext() and next() methods. The following snippet checks that there is only one record and that it has the expected field values:

assertThat(dataFileReader.hasNext(), is(true));

GenericRecord result = dataFileReader.next();

assertThat(result.get("left").toString(), is("L"));

assertThat(result.get("right").toString(), is("R"));

assertThat(dataFileReader.hasNext(), is(false));

Rather than using the usual next() method, however, it is preferable to use the overloaded form that takes an instance of the object to be returned (in this case, GenericRecord), since it will reuse the object and save allocation and garbage collection costs for files containing many objects. The following is idiomatic:

GenericRecord record = null;

while (dataFileReader.hasNext()) {

record = dataFileReader.next(record);

// process record

}

If object reuse is not important, you can use this shorter form:

for (GenericRecord record : dataFileReader) {

// process record

}

For the general case of reading a file on a Hadoop filesystem, use Avro’s FsInput to specify the input file using a Hadoop Path object. DataFileReader actually offers random access to Avro datafiles (via its seek() and sync() methods); however, in many cases, sequential streaming access is sufficient, for which DataFileStream should be used. DataFileStream can read from any Java InputStream.

Interoperability

To demonstrate Avro’s language interoperability, let’s write a datafile using one language (Python) and read it back with another (Java).

Python API

The program in Example 12-1 reads comma-separated strings from standard input and writes them as StringPair records to an Avro datafile. Like in the Java code for writing a datafile, we create a DatumWriter and a DataFileWriter object. Notice that we have embedded the Avro schema in the code, although we could equally well have read it from a file.

Python represents Avro records as dictionaries; each line that is read from standard in is turned into a dict object and appended to the DataFileWriter.

Example 12-1. A Python program for writing Avro record pairs to a datafile

import os

import string

import sys

from avro import schema

from avro import io

from avro import datafile

if __name__ == '__main__':

if len(sys.argv) != 2:

sys.exit('Usage: %s <data_file>' % sys.argv[0])

avro_file = sys.argv[1]

writer = open(avro_file, 'wb')

datum_writer = io.DatumWriter()

schema_object = schema.parse("\

{ "type": "record",

"name": "StringPair",

"doc": "A pair of strings.",

"fields": [

{"name": "left", "type": "string"},

{"name": "right", "type": "string"}

]

}")

dfw = datafile.DataFileWriter(writer, datum_writer, schema_object)

for line insys.stdin.readlines():

(left, right) = string.split(line.strip(), ',')

dfw.append({'left':left, 'right':right});

dfw.close()

Before we can run the program, we need to install Avro for Python:

% easy_install avro

To run the program, we specify the name of the file to write output to (pairs.avro) and send input pairs over standard in, marking the end of file by typing Ctrl-D:

% python ch12-avro/src/main/py/write_pairs.py pairs.avro

a,1

c,2

b,3

b,2

^D

Avro Tools

Next, we’ll use the Avro tools (written in Java) to display the contents of pairs.avro. The tools JAR is available from the Avro website; here we assume it’s been placed in a local directory called $AVRO_HOME. The tojson command converts an Avro datafile to JSON and prints it to the console:

% java -jar $AVRO_HOME/avro-tools-*.jar tojson pairs.avro

{"left":"a","right":"1"}

{"left":"c","right":"2"}

{"left":"b","right":"3"}

{"left":"b","right":"2"}

We have successfully exchanged complex data between two Avro implementations (Python and Java).

Schema Resolution

We can choose to use a different schema for reading the data back (the reader’s schema) from the one we used to write it (the writer’s schema). This is a powerful tool because it enables schema evolution. To illustrate, consider a new schema for string pairs with an added descriptionfield:

{

"type": "record",

"name": "StringPair",

"doc": "A pair of strings with an added field.",

"fields": [

{"name": "left", "type": "string"},

{"name": "right", "type": "string"},

{"name": "description", "type": "string", "default": ""}

]

}

We can use this schema to read the data we serialized earlier because, crucially, we have given the description field a default value (the empty string),[82] which Avro will use when there is no such field defined in the records it is reading. Had we omitted the default attribute, we would get an error when trying to read the old data.

NOTE

To make the default value null rather than the empty string, we would instead define the description field using a union with the null Avro type:

{"name": "description", "type": ["null", "string"], "default": null}

When the reader’s schema is different from the writer’s, we use the constructor for GenericDatumReader that takes two schema objects, the writer’s and the reader’s, in that order:

DatumReader<GenericRecord> reader =

new GenericDatumReader<GenericRecord>(schema, newSchema);

Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(),

null);

GenericRecord result = reader.read(null, decoder);

assertThat(result.get("left").toString(), is("L"));

assertThat(result.get("right").toString(), is("R"));

assertThat(result.get("description").toString(), is(""));

For datafiles, which have the writer’s schema stored in the metadata, we only need to specify the reader’s schema explicitly, which we can do by passing null for the writer’s schema:

DatumReader<GenericRecord> reader =

new GenericDatumReader<GenericRecord>(null, newSchema);

Another common use of a different reader’s schema is to drop fields in a record, an operation called projection. This is useful when you have records with a large number of fields and you want to read only some of them. For example, this schema can be used to get only the rightfield of a StringPair:

{

"type": "record",

"name": "StringPair",

"doc": "The right field of a pair of strings.",

"fields": [

{"name": "right", "type": "string"}

]

}

The rules for schema resolution have a direct bearing on how schemas may evolve from one version to the next, and are spelled out in the Avro specification for all Avro types. A summary of the rules for record evolution from the point of view of readers and writers (or servers and clients) is presented in Table 12-4.

Table 12-4. Schema resolution of records

New schema

Writer

Reader

Action

Added field

Old

New

The reader uses the default value of the new field, since it is not written by the writer.

New

Old

The reader does not know about the new field written by the writer, so it is ignored (projection).

Removed field

Old

New

The reader ignores the removed field (projection).

New

Old

The removed field is not written by the writer. If the old schema had a default defined for the field, the reader uses this; otherwise, it gets an error. In this case, it is best to update the reader’s schema, either at the same time as or before the writer’s.

Another useful technique for evolving Avro schemas is the use of name aliases. Aliases allow you to use different names in the schema used to read the Avro data than in the schema originally used to write the data. For example, the following reader’s schema can be used to readStringPair data with the new field names first and second instead of left and right (which are what it was written with):

{

"type": "record",

"name": "StringPair",

"doc": "A pair of strings with aliased field names.",

"fields": [

{"name": "first", "type": "string", "aliases": ["left"]},

{"name": "second", "type": "string", "aliases": ["right"]}

]

}

Note that the aliases are used to translate (at read time) the writer’s schema into the reader’s, but the alias names are not available to the reader. In this example, the reader cannot use the field names left and right, because they have already been translated to first and second.

Sort Order

Avro defines a sort order for objects. For most Avro types, the order is the natural one you would expect — for example, numeric types are ordered by ascending numeric value. Others are a little more subtle. For instance, enums are compared by the order in which the symbols are defined and not by the values of the symbol strings.

All types except record have preordained rules for their sort order, as described in the Avro specification, that cannot be overridden by the user. For records, however, you can control the sort order by specifying the order attribute for a field. It takes one of three values: ascending (the default), descending (to reverse the order), or ignore (so the field is skipped for comparison purposes).

For example, the following schema (SortedStringPair.avsc) defines an ordering of StringPair records by the right field in descending order. The left field is ignored for the purposes of ordering, but it is still present in the projection:

{

"type": "record",

"name": "StringPair",

"doc": "A pair of strings, sorted by right field descending.",

"fields": [

{"name": "left", "type": "string", "order": "ignore"},

{"name": "right", "type": "string", "order": "descending"}

]

}

The record’s fields are compared pairwise in the document order of the reader’s schema. Thus, by specifying an appropriate reader’s schema, you can impose an arbitrary ordering on data records. This schema (SwitchedStringPair.avsc) defines a sort order by the right field, then theleft:

{

"type": "record",

"name": "StringPair",

"doc": "A pair of strings, sorted by right then left.",

"fields": [

{"name": "right", "type": "string"},

{"name": "left", "type": "string"}

]

}

Avro implements efficient binary comparisons. That is to say, Avro does not have to deserialize binary data into objects to perform the comparison, because it can instead work directly on the byte streams.[83] In the case of the original StringPair schema (with no order attributes), for example, Avro implements the binary comparison as follows.

The first field, left, is a UTF-8-encoded string, for which Avro can compare the bytes lexicographically. If they differ, the order is determined, and Avro can stop the comparison there. Otherwise, if the two byte sequences are the same, it compares the second two (right) fields, again lexicographically at the byte level because the field is another UTF-8 string.

Notice that this description of a comparison function has exactly the same logic as the binary comparator we wrote for Writables in Implementing a RawComparator for speed. The great thing is that Avro provides the comparator for us, so we don’t have to write and maintain this code. It’s also easy to change the sort order just by changing the reader’s schema. For the SortedStringPair.avsc and SwitchedStringPair.avsc schemas, the comparison function Avro uses is essentially the same as the one just described. The differences are which fields are considered, the order in which they are considered, and whether the sort order is ascending or descending.

Later in the chapter, we’ll use Avro’s sorting logic in conjunction with MapReduce to sort Avro datafiles in parallel.

Avro MapReduce

Avro provides a number of classes for making it easy to run MapReduce programs on Avro data. We’ll use the new MapReduce API classes from the org.apache.avro.mapreduce package, but you can find (old-style) MapReduce classes in the org.apache.avro.mapred package.

Let’s rework the MapReduce program for finding the maximum temperature for each year in the weather dataset, this time using the Avro MapReduce API. We will represent weather records using the following schema:

{

"type": "record",

"name": "WeatherRecord",

"doc": "A weather reading.",

"fields": [

{"name": "year", "type": "int"},

{"name": "temperature", "type": "int"},

{"name": "stationId", "type": "string"}

]

}

The program in Example 12-2 reads text input (in the format we saw in earlier chapters) and writes Avro datafiles containing weather records as output.

Example 12-2. MapReduce program to find the maximum temperature, creating Avro output

public class AvroGenericMaxTemperature extends Configured implements Tool {

private static final Schema SCHEMA = new Schema.Parser().parse(

"{" +

" \"type\": \"record\"," +

" \"name\": \"WeatherRecord\"," +

" \"doc\": \"A weather reading.\"," +

" \"fields\": [" +

" {\"name\": \"year\", \"type\": \"int\"}," +

" {\"name\": \"temperature\", \"type\": \"int\"}," +

" {\"name\": \"stationId\", \"type\": \"string\"}" +

" ]" +

"}"

);

public static class MaxTemperatureMapper

extends Mapper<LongWritable, Text, AvroKey<Integer>,

AvroValue<GenericRecord>> {

private NcdcRecordParser parser = new NcdcRecordParser();

private GenericRecord record = new GenericData.Record(SCHEMA);

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

parser.parse(value.toString());

if (parser.isValidTemperature()) {

record.put("year", parser.getYearInt());

record.put("temperature", parser.getAirTemperature());

record.put("stationId", parser.getStationId());

context.write(new AvroKey<Integer>(parser.getYearInt()),

new AvroValue<GenericRecord>(record));

}

}

}

public static class MaxTemperatureReducer

extends Reducer<AvroKey<Integer>, AvroValue<GenericRecord>,

AvroKey<GenericRecord>, NullWritable> {

@Override

protected void reduce(AvroKey<Integer> key, Iterable<AvroValue<GenericRecord>>

values, Context context) throws IOException, InterruptedException {

GenericRecord max = null;

for (AvroValue<GenericRecord> value : values) {

GenericRecord record = value.datum();

if (max == null ||

(Integer) record.get("temperature") > (Integer) max.get("temperature")) {

max = newWeatherRecord(record);

}

}

context.write(new AvroKey(max), NullWritable.get());

}

private GenericRecord newWeatherRecord(GenericRecord value) {

GenericRecord record = new GenericData.Record(SCHEMA);

record.put("year", value.get("year"));

record.put("temperature", value.get("temperature"));

record.put("stationId", value.get("stationId"));

return record;

}

}

@Override

public int run(String[] args) throws Exception {

if (args.length != 2) {

System.err.printf("Usage: %s [generic options] <input> <output>\n",

getClass().getSimpleName());

ToolRunner.printGenericCommandUsage(System.err);

return -1;

}

Job job = new Job(getConf(), "Max temperature");

job.setJarByClass(getClass());

job.getConfiguration().setBoolean(

Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.INT));

AvroJob.setMapOutputValueSchema(job, SCHEMA);

AvroJob.setOutputKeySchema(job, SCHEMA);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(AvroKeyOutputFormat.class);

job.setMapperClass(MaxTemperatureMapper.class);

job.setReducerClass(MaxTemperatureReducer.class);

return job.waitForCompletion(true) ? 0 : 1;

}

public static void main(String[] args) throws Exception {

int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), args);

System.exit(exitCode);

}

}

This program uses the Generic Avro mapping. This frees us from generating code to represent records, at the expense of type safety (field names are referred to by string value, such as "temperature").[84] The schema for weather records is inlined in the code for convenience (and read into the SCHEMA constant), although in practice it might be more maintainable to read the schema from a local file in the driver code and pass it to the mapper and reducer via the Hadoop job configuration. (Techniques for achieving this are discussed in Side Data Distribution.)

There are a couple of differences from the regular Hadoop MapReduce API. The first is the use of wrappers around Avro Java types. For this MapReduce program, the key is the year (an integer), and the value is the weather record, which is represented by Avro’s GenericRecord. This translates to AvroKey<Integer> for the key type and AvroValue<GenericRecord> for the value type in the map output (and reduce input).

The MaxTemperatureReducer iterates through the records for each key (year) and finds the one with the maximum temperature. It is necessary to make a copy of the record with the highest temperature found so far, since the iterator reuses the instance for reasons of efficiency (and only the fields are updated).

The second major difference from regular MapReduce is the use of AvroJob for configuring the job. AvroJob is a convenience class for specifying the Avro schemas for the input, map output, and final output data. In this program, no input schema is set, because we are reading from a text file. The map output key schema is an Avro int and the value schema is the weather record schema. The final output key schema is the weather record schema, and the output format is AvroKeyOutputFormat, which writes keys to Avro datafiles and ignores the values (which areNullWritable).

The following commands show how to run the program on a small sample dataset:

% export HADOOP_CLASSPATH=avro-examples.jar

% export HADOOP_USER_CLASSPATH_FIRST=true # override version of Avro in Hadoop

% hadoop jar avro-examples.jar AvroGenericMaxTemperature \

input/ncdc/sample.txt output

On completion we can look at the output using the Avro tools JAR to render the Avro datafile as JSON, one record per line:

% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-r-00000.avro

{"year":1949,"temperature":111,"stationId":"012650-99999"}

{"year":1950,"temperature":22,"stationId":"011990-99999"}

In this example we read a text file and created an Avro datafile, but other combinations are possible, which is useful for converting between Avro formats and other formats (such as SequenceFiles). See the documentation for the Avro MapReduce package for details.

Sorting Using Avro MapReduce

In this section, we use Avro’s sort capabilities and combine them with MapReduce to write a program to sort an Avro datafile (Example 12-3).

Example 12-3. A MapReduce program to sort an Avro datafile

public class AvroSort extends Configured implements Tool {

static class SortMapper<K> extends Mapper<AvroKey<K>, NullWritable,

AvroKey<K>, AvroValue<K>> {

@Override

protected void map(AvroKey<K> key, NullWritable value,

Context context) throws IOException, InterruptedException {

context.write(key, new AvroValue<K>(key.datum()));

}

}

static class SortReducer<K> extends Reducer<AvroKey<K>, AvroValue<K>,

AvroKey<K>, NullWritable> {

@Override

protected void reduce(AvroKey<K> key, Iterable<AvroValue<K>> values,

Context context) throws IOException, InterruptedException {

for (AvroValue<K> value : values) {

context.write(new AvroKey(value.datum()), NullWritable.get());

}

}

}

@Override

public int run(String[] args) throws Exception {

if (args.length != 3) {

System.err.printf(

"Usage: %s [generic options] <input> <output> <schema-file>\n",

getClass().getSimpleName());

ToolRunner.printGenericCommandUsage(System.err);

return -1;

}

String input = args[0];

String output = args[1];

String schemaFile = args[2];

Job job = new Job(getConf(), "Avro sort");

job.setJarByClass(getClass());

job.getConfiguration().setBoolean(

Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);

FileInputFormat.addInputPath(job, new Path(input));

FileOutputFormat.setOutputPath(job, new Path(output));

AvroJob.setDataModelClass(job, GenericData.class);

Schema schema = new Schema.Parser().parse(new File(schemaFile));

AvroJob.setInputKeySchema(job, schema);

AvroJob.setMapOutputKeySchema(job, schema);

AvroJob.setMapOutputValueSchema(job, schema);

AvroJob.setOutputKeySchema(job, schema);

job.setInputFormatClass(AvroKeyInputFormat.class);

job.setOutputFormatClass(AvroKeyOutputFormat.class);

job.setOutputKeyClass(AvroKey.class);

job.setOutputValueClass(NullWritable.class);

job.setMapperClass(SortMapper.class);

job.setReducerClass(SortReducer.class);

return job.waitForCompletion(true) ? 0 : 1;

}

public static void main(String[] args) throws Exception {

int exitCode = ToolRunner.run(new AvroSort(), args);

System.exit(exitCode);

}

}

This program (which uses the Generic Avro mapping and hence does not require any code generation) can sort Avro records of any type, represented in Java by the generic type parameter K. We choose a value that is the same as the key, so that when the values are grouped by key we can emit all of the values in the case that more than one of them share the same key (according to the sorting function). This means we don’t lose any records.[85] The mapper simply emits the input key wrapped in an AvroKey and an AvroValue. The reducer acts as an identity, passing the values through as output keys, which will get written to an Avro datafile.

The sorting happens in the MapReduce shuffle, and the sort function is determined by the Avro schema that is passed to the program. Let’s use the program to sort the pairs.avro file created earlier, using the SortedStringPair.avsc schema to sort by the right field in descending order. First, we inspect the input using the Avro tools JAR:

% java -jar $AVRO_HOME/avro-tools-*.jar tojson input/avro/pairs.avro

{"left":"a","right":"1"}

{"left":"c","right":"2"}

{"left":"b","right":"3"}

{"left":"b","right":"2"}

Then we run the sort:

% hadoop jar avro-examples.jar AvroSort input/avro/pairs.avro output \

ch12-avro/src/main/resources/SortedStringPair.avsc

Finally, we inspect the output and see that it is sorted correctly:

% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-r-00000.avro

{"left":"b","right":"3"}

{"left":"b","right":"2"}

{"left":"c","right":"2"}

{"left":"a","right":"1"}

Avro in Other Languages

For languages and frameworks other than Java, there are a few choices for working with Avro data.

AvroAsTextInputFormat is designed to allow Hadoop Streaming programs to read Avro datafiles. Each datum in the file is converted to a string, which is the JSON representation of the datum, or just to the raw bytes if the type is Avro bytes. Going the other way, you can specifyAvroTextOutputFormat as the output format of a Streaming job to create Avro datafiles with a bytes schema, where each datum is the tab-delimited key-value pair written from the Streaming output. Both of these classes can be found in the org.apache.avro.mapred package.

It’s also worth considering other frameworks like Pig, Hive, Crunch, and Spark for doing Avro processing, since they can all read and write Avro datafiles by specifying the appropriate storage formats. See the relevant chapters in this book for details.

[79] Named after the British aircraft manufacturer from the 20th century.

[80] Avro also performs favorably compared to other serialization libraries, as the benchmarks demonstrate.

[81] Avro can be downloaded in both source and binary forms. Get usage instructions for the Avro tools by typing java -jar avro-tools-*.jar.

[82] Default values for fields are encoded using JSON. See the Avro specification for a description of this encoding for each data type.

[83] A useful consequence of this property is that you can compute an Avro datum’s hash code from either the object or the binary representation (the latter by using the static hashCode() method on BinaryData) and get the same result in both cases.

[84] For an example that uses the Specific mapping with generated classes, see the AvroSpecificMaxTemperature class in the example code.

[85] If we had used the identity mapper and reducer here, the program would sort and remove duplicate keys at the same time. We encounter this idea of duplicating information from the key in the value object again in Secondary Sort.