The Old and New Java MapReduce APIs - Hadoop: The Definitive Guide (2015)

Hadoop: The Definitive Guide (2015)

Appendix D. The Old and New Java MapReduce APIs

The Java MapReduce API used throughout this book is called the “new API,” and it replaces the older, functionally equivalent API. Although Hadoop ships with both the old and new MapReduce APIs, they are not compatible with each other. Should you wish to use the old API, you can, since the code for all the MapReduce examples in this book is available for the old API on the book’s website (in the oldapi package).

There are several notable differences between the two APIs:

§ The new API is in the org.apache.hadoop.mapreduce package (and subpackages). The old API can still be found in org.apache.hadoop.mapred.

§ The new API favors abstract classes over interfaces, since these are easier to evolve. This means that you can add a method (with a default implementation) to an abstract class without breaking old implementations of the class.[168] For example, the Mapper and Reducer interfaces in the old API are abstract classes in the new API.

§ The new API makes extensive use of context objects that allow the user code to communicate with the MapReduce system. The new Context, for example, essentially unifies the role of the JobConf, the OutputCollector, and the Reporter from the old API.

§ In both APIs, key-value record pairs are pushed to the mapper and reducer, but in addition, the new API allows both mappers and reducers to control the execution flow by overriding the run() method. For example, records can be processed in batches, or the execution can be terminated before all the records have been processed. In the old API, this is possible for mappers by writing a MapRunnable, but no equivalent exists for reducers.

§ Job control is performed through the Job class in the new API, rather than the old JobClient, which no longer exists in the new API.

§ Configuration has been unified in the new API. The old API has a special JobConf object for job configuration, which is an extension of Hadoop’s vanilla Configuration object (used for configuring daemons; see The Configuration API). In the new API, job configuration is done through a Configuration, possibly via some of the helper methods on Job.

§ Output files are named slightly differently: in the old API both map and reduce outputs are named part-nnnnn, whereas in the new API map outputs are named part-m-nnnnn and reduce outputs are named part-r-nnnnn (where nnnnn is an integer designating the part number, starting from 00000).

§ User-overridable methods in the new API are declared to throw java.lang.InterruptedException. This means that you can write your code to be responsive to interrupts so that the framework can gracefully cancel long-running operations if it needs to.[169]

§ In the new API, the reduce() method passes values as a java.lang.Iterable, rather than a java.lang.Iterator (as the old API does). This change makes it easier to iterate over the values using Java’s for-each loop construct:

for (VALUEIN value : values) { ... }

WARNING

Programs using the new API that were compiled against Hadoop 1 need to be recompiled to run against Hadoop 2. This is because some classes in the new MapReduce API changed to interfaces between the Hadoop 1 and Hadoop 2 releases. The symptom is an error at runtime like the following:

java.lang.IncompatibleClassChangeError: Found interface

org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected

Example D-1 shows the MaxTemperature application (from Java MapReduce) rewritten to use the old API. The differences are highlighted in bold.

WARNING

When converting your Mapper and Reducer classes to the new API, don’t forget to change the signatures of the map() and reduce() methods to the new form. Just changing your class to extend the new Mapper or Reducer classes will not produce a compilation error or warning, because these classes provide identity forms of the map() and reduce() methods (respectively). Your mapper or reducer code, however, will not be invoked, which can lead to some hard-to-diagnose errors.

Annotating your map() and reduce() methods with the @Override annotation will allow the Java compiler to catch these errors.

Example D-1. Application to find the maximum temperature, using the old MapReduce API

public class OldMaxTemperature {

static class OldMaxTemperatureMapper extends MapReduceBase

implements Mapper<LongWritable, Text, Text, IntWritable> {

private static final int MISSING = 9999;

@Override

public void map(LongWritable key, Text value,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

String line = value.toString();

String year = line.substring(15, 19);

int airTemperature;

if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs

airTemperature = Integer.parseInt(line.substring(88, 92));

} else {

airTemperature = Integer.parseInt(line.substring(87, 92));

}

String quality = line.substring(92, 93);

if (airTemperature != MISSING && quality.matches("[01459]")) {

output.collect(new Text(year), new IntWritable(airTemperature));

}

}

}

static class OldMaxTemperatureReducer extends MapReduceBase

implements Reducer<Text, IntWritable, Text, IntWritable> {

@Override

public void reduce(Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output, Reporter reporter)

throws IOException {

int maxValue = Integer.MIN_VALUE;

while (values.hasNext()) {

maxValue = Math.max(maxValue, values.next().get());

}

output.collect(key, new IntWritable(maxValue));

}

}

public static void main(String[] args) throws IOException {

if (args.length != 2) {

System.err.println("Usage: OldMaxTemperature <input path> <output path>");

System.exit(-1);

}

JobConf conf = new JobConf(OldMaxTemperature.class);

conf.setJobName("Max temperature");

FileInputFormat.addInputPath(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

conf.setMapperClass(OldMaxTemperatureMapper.class);

conf.setReducerClass(OldMaxTemperatureReducer.class);

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

JobClient.runJob(conf);

}

}

[168] Technically, such a change would almost certainly break implementations that already define a method with the same signature as Jim des Rivières explains in “Evolving Java-based APIs,” for all practical purposes this is treated as a compatible change.

[169] “Java theory and practice: Dealing with InterruptedException” by Brian Goetz explains this technique in detail.