Optimizing MapReduce Tasks - Optimizing Hadoop for MapReduce (2014)

Optimizing Hadoop for MapReduce (2014)

Chapter 6. Optimizing MapReduce Tasks

Most MapReduce programs are written for data analysis and they usually take a lot of time to finish. Many companies are embracing Hadoop for advanced data analytics over large datasets that require completion-time guarantees. Efficiency, especially the I/O costs of MapReduce, still need to be addressed for successful implications.

In this chapter, we will discuss some optimization techniques such as using compression and using Combiners in order to improve job execution. Also in this chapter, you will learn basic guidelines and rules to optimize your mappers and reducers code, and techniques to use and reuse the object's instances.

The following topics will be covered in this chapter:

· The benefits of using Combiners

· The importance of using compression

· Learning to use appropriate Writable types

· How to reuse types smartly

· How to optimize your mappers and reducers' code

Using Combiners

You can improve your overall MapReduce performance using Combiners. A Combiner is equivalent to a local Reduce operation and can effectively improve the rate of subsequent global Reduce operations. Basically, it is used to preliminarily optimize and minimize the number of key/value pairs that will be transmitted across the network between mappers and reducers. A Combiner will process the intermediate results of the key/value pairs' output using Map operations and it does not impact the transformation logic coded in the map and reduce functions.

The standard convention using Combiners is just to repurpose your reducer function as your Combiner. The computing logic should be Commutative (the order in which an operation such as addition is processed has no effect on the final result) and Associative(the order in which we apply the addition operation has no effect on the final result).

Note

To get more information about Commutative and Associative properties, you can browse the following links:

http://en.wikipedia.org/wiki/Commutative_property

http://en.wikipedia.org/wiki/Associative_property

Implementing a Combiner means to implement a Combiner class. Once the customized class is implemented and added, the Map function will not immediately write to the output to produce intermediate results of key/value pairs. Instead, they will be collected to lists, and each key corresponds to a list of values. The Combiner class will output the key and corresponding value list in the form of key/value pairs. When the Combiner buffer reaches a certain number of key/value pairs, the data in the buffer is cleared and transferred to the Reduce function.

Tip

Calling your Combiner customized class for a job is similar to how the map and reduce classes are set:

job.setCombinerClass(MyCombine.class);

The following screenshot shows the Hadoop counters you should focus on when using Combiners:

Using Combiners

In this screenshot, you can observe the number of records of Combine input records and Combine output records, which is 0 because the job doesn't implement any Combiner class. Therefore, the number of Reduce input records is the same as Map output records. The following screenshot shows the Combiner's effect once it has been implemented:

Using Combiners

In this screenshot, notice the number of records of Combine input records, Combine output records, and the number of Reduce input records and Map output records. You will observe that implementing the Combiner reduced the data volume transferred to the Reduce function.

Comparing to the previous figure, the number of Combine records for this job has increased due to the use of the Combiner function. The Reduce input data volume has been dropped to 685,760 while it was initially 10,485,760. In a very large data context, using Combiners is an efficient way to improve the MapReduce job's overall performance.

The following snippet of code illustrates a custom Combiner class extracted from a MapReduce job:

Using Combiners

As you can see, the Combine class implements the Reducer interface, and like a reducer function, it will be called with multiple values of the map output. This class overrides the reduce() method with its own code. The sample code between lines 12 and 23 iterates over the list of values transmitted by the mapper function and if the current key is different from the previous key, it calls the collect() method to output the result.

Using compression

Compression reduces the number of bytes read from or written to the underlying storage system (HDFS). Compression enhances efficiency of network bandwidth and disk space. Using data compression is important in Hadoop especially in a very large data context and under intensive workloads. In such a context, I/O operations and network data transfers take a considerable amount of time to complete. Moreover, the Shuffle and Merge process will also be under huge I/O pressure.

Because disk I/O and network bandwidth are precious resources in Hadoop, data compression is helpful to save these resources and minimize I/O disk and network transfer. Achieving increased performance and saving these resources is not free, although it is done with low CPU costs while compressing and decompressing operations.

Whenever I/O disk or network traffic affects your MapReduce job performance, you can improve the end-to-end processing time and reduce I/O and network traffic by enabling compression during any of the MapReduce phases.

Note

Compressing the map outputs will always contribute in reducing the network traffic between map and reduce tasks.

Compression may be enabled during any phase of the MapReduce job, as shown in the following figure:

Using compression

· Compress Input: This should be considered in a very large data context that you plan to process repeatedly. Therefore, you do not need to explicitly specify a codec to use. Hadoop will automatically check for the extension of your files and if it detects an appropriate extension, it will use the appropriate codec to compress and decompress your files. Otherwise, no compression codec will be used by Hadoop.

Note

When using replication, compressing input files saves storage space and speeds up data transfer. To compress input data, you should use splittable algorithms such as bzip2, or use zlib with the SequenceFile format.

· Compress Mapper output: Compression should be considered at this stage if your map tasks output a large amount of intermediate data. This will significantly improve the internal shuffle process, which is the most resource-consuming Hadoop process. You should always consider using compression if you observe slow network transfers due to large data volumes. To compress Mapper outputs, use faster codecs such as LZO, LZ4, or Snappy.

Note

Limpel-Zif-Oberhumer (LZO) is a common compression codec used in Hadoop to compress data. It was designed to keep up with the hard disk reading speed and therefore consider speed as priority, not compression rate. In comparison to the gzipcodec, it compresses about five times faster and decompresses two times faster. A compressed file with LZO is 50 percent larger than the same file compressed with gzip, but still 25-50 percent smaller than the original file size, which is good to enhance performance and the map phase completes about four times faster.

· Compress Reducer output: Enabling compression at this stage will reduce the amount of data to be stored and therefore the required disk space. This is also useful if you chain MapReduce jobs together while the input files of the second job are already compressed.

Note

Compressing Reducer output should be considered for storage and/or archival, better write speeds, or MapReduce jobs. To compress Reducer output, use standard utilities such as gzip or bzip2 for data interchange, and faster codecs for chained jobs.

The following screenshot shows the number of bytes written and read by a MapReduce job before enabling compression on map's output files:

Using compression

In order to enable map output file compression, you can change these configuration parameters (in the mapred-site.xml file) as follows:

Using compression

By default, the mapred.compress.map.output value is set to false, and the mapred.output.compression.type value is set to RECORD. Changing this value to BLOCK enhances compression ratio.

After enabling compression for the map output files, the number of bytes read by the reduce function (268) has considerably decreased in comparison to the previous figure (858), as illustrated in the following figure:

Using compression

In order to enable compression in Hadoop, you can set the configuration parameters as shown in the following table:

Parameter

Default value

Stage

Recommendations

io.compression.codec

DefaultCodec

Input compress

Hadoop uses file extensions to determine whether a compression codec is supported

mapreduce.map.output.compress

false

Mapper output

Set this parameter to true to enable compression

mapreduce.map.output.compress.codec

DefaultCodec

Mapper output

Use LZO, LZ4 or Snappy codec to compress data at this stage

mapreduce.output.fileoutputformat.compress

false

Reducer output

Set this parameter to true to enable compression

mapreduce.output.fileoutputformat.compress.codec

DefaultCodec

Reducer output

Use standard tool/codec such as gzip or bzip2

mapreduce.output.fileoutputformat.compress.type

RECORD

Reducer output

Type of compression to use for SequenceFile outputs: NONE and BLOCK

Using appropriate Writable types

Hadoop uses custom datatype serialization/RPC mechanism and defines its own box type classes. These classes are used to manipulate strings (Text), integers (IntWritable), and so on, and they implement the Writable class, which defines a deserialization protocol.

Therefore, all values in Hadoop are Writable type objects and all keys are instances of WritableComparable, which defines a sort order, because they need to be compared.

Writable objects are mutable and considerably more compact as no meta info needs to be stored (class name, fields, super classes, and so on), and straightforward random access gives higher performance. As binary Writable types will take up less space, this will reduce the size of intermediate data written by the Map or Combiner function. Reducing intermediate data can provide a substantial performance gain by reducing network transfer and I/O disk required storage space.

Using the appropriate Writable type in your code will contribute to enhancing the overall MapReduce job performance. This is mostly done by eliminating time for string splitting by using the Text type instead of the String type. Also, using VIntWritable or VLongWritablecan sometimes be faster to use than regular int and long primitive Java datatypes.

During the Shuffle and Sort phase, comparing intermediate keys may be a bottleneck and Hadoop may spend time in this phase. Implementing a new comparison mechanism can improve your MapReduce performance. There are two ways to compare your keys:

· By implementing the org.apache.hadoop.io.WritableComparable interface

· By implementing the RawComparator interface

Based on our experience, implementing raw byte comparisons using RawComparator improves the overall performance of the MapReduce job and has an advantage over WritableComparable.

A typical implementation of WritableComparable class will look like the following snippet of code:

Using appropriate Writable types

To implement RawComparator, you can also extend the WritableComparator class, which implements RawComparator. The following snippet of code illustrates a typical implementation of the WritableComparator class:

Using appropriate Writable types

Note

Extending the WritableComparator class allows you to use inherited methods of this class to manipulate your intermediate MapReduce keys.

The readInt() method inherited from WritableComparator converts 4 consecutive bytes into a primitive Java int (which is 4 bytes).

Tip

To wire up your RawComparator custom class implementation, set its sort comparator class as follows:

job.setSortComparatorClass(MyClassComparator.class);

Depending on the data you want to process, you may need to define how to read your data file into the Mappers instances. Hadoop allows you to define your own data format by implementing the InputFormat interface and comes with several implementations of it.

Note

The InputFormat class is one of the fundamental classes of the Hadoop framework, and it is responsible for defining two main things: InputSplit and RecordReader.

An InputFormat class describes both how to present the data to the Mapper and where the data originates from. The InputSplit interface defines both the size of individual map tasks and its execution server. The RecordReader interface is responsible for reading records from the input file, and submitting them (as key/value pairs) to the mappers. Another important job of the InputFormat class is to split the input files sources into fragments, represented by FileInputSplit instances. These fragments are used as the input for individual mappers. In order to improve the job's performance, this process must be quick enough and cheap (it should use minimum CPU, I/O storage, and network resources).

Tip

When creating your own InputFormat class, it is better to subclass the FileInputFormat class rather than to implement InputFormat directly.

Reusing types smartly

Often, Hadoop problems are caused by some form of memory mismanagement and nodes don't suddenly fail but experience slowdown as I/O devices go bad. Hadoop has many options for controlling memory allocation and usage at several levels of granularity, but it does not check these options. So, it is possible for the combined heap size for all the daemons on a machine to exceed the amount of physical memory.

Each Java process itself has a configured maximum heap size. Depending on whether the JVM heap size, OS limit, or physical memory is exhausted first, this will cause an out-of-memory error, a JVM abort, or severe swapping, respectively.

You should pay attention to memory management. All unnecessarily allocated memory resources should be removed to maximize memory space for the MapReduce jobs.

Reusing types is a technique for minimizing resources usage such as CPU and memory space. When you deal with millions of data records, it is always cheaper to reuse an existing instance rather than create a new one. The simplest kind of reusability in a MapReduce job is to use an existing Hadoop Writable type as it is, and this is possible in most circumstances.

One of the most common error with beginners when coding a map or reduce function is to allocate a new object for every output and often this is done inside a for or foreach loop, which may create thousands or millions of new Writable instances. These instances have very short TTL (Time To Live) and enforce Java's garbage collector to have an intensive job to deal with all allocated memory required by them.

The following snippet of code shows a mapper function that allocates a new Writable instance for every output (you should avoid coding this way).

Reusing types smartly

In order to detect whether you are spending time on allocating resources for unnecessary objects, you should inspect your tasks' logs and analyze the garbage collector activities. If it represents a lot of time and is frequent, it means you should review your code and enhance it by eliminating the unnecessary creation of new objects.

This is because if you have low memory, creating new objects will be stored in heap memory as long as a certain memory threshold is exceeded and the garbage collector must run more often.

Note

Reusing Writable variables is theoretically better, but based on Amdahl's law (http://en.wikipedia.org/wiki/Amdahl%27s_law), the improvement may not be noticeable.

Logs are commonly used to diagnose issues. Often, the line that indicates the problem is buried in thousands of lines. Picking out the relevant line is a long and fastidious task. To inspect MapReduce tasks logs in more detail, you should set the JVM memory parameters in the mapred-site.xml configuration file, as shown in the following table:

Parameter

Default value

Recommendations

mapred.child.java.opts

-Xmx200m

Add: -verbose:gc -XX:+PrintGCDetails

The following snippet of code shows a better way to find the benefits of reusing Writable variables:

Reusing types smartly

JVM reuse is an optimization technique of reusing JVMs for multiple tasks. If it is enabled, multiple tasks can be executed sequentially with one JVM. You can enable JVM reuse by changing the appropriate parameter in the mapred-site.xml configuration file as shown in the following table:

Parameter

Default value

Recommendations

mapred.job.reuse.jvm.num.tasks

1

Change this variable to run the desired number of tasks (for example, 2 to run two tasks). If this variable is set to -1, the number of tasks the JVM can execute is not limited.

Optimizing mappers and reducers code

Optimizing MapReduce code-side performance in detail exceeds the scope of this book. In this section, we will provide a basic guideline with some rules to contribute to the improvement of your MapReduce job performance.

One of the important features of Hadoop is that all data is processed in a unit known as records. While records have almost the same size, theoretically, the time to process such records should be the same. However, in practice, the processing time of records within a task vary significantly and slowness may appear when reading a record from memory, processing the record, or writing the record to memory. Moreover, in practice, two other factors may affect the mapper or reducer performance: I/O access time and spill, and overhead waiting time resulting from heavy I/O requests.

Note

Efficiency is measurable and quantitatively determined by the ratio of output to input.

MapReduce provides ease of use while a programmer defines his job with only Map and Reduce functions, without having to specify physical distribution of his job across nodes. Therefore, Hadoop provides a fixed dataflow to perform a MapReduce job. This is why many complex algorithms are hard to implement with mapper(s) and reducer(s) only in a MapReduce job. In addition, some algorithms that require multiple inputs are not well supported since the dataflow of MapReduce is originally designed to read a single input and generate a single output.

Optimizing a MapReduce job means:

· Getting the same output in less time

· Using less resources to get the same output and in the same time

· Getting more output with same resources in the same time

In any programming language, factorizing your code is the first optimization step. If you run multiple jobs to process the same input data, it is probably the opportunity to rewrite them into fewer jobs. Also, when writing your mapper or reducer function code, you should choose the most efficient underlying algorithm, which will contribute to speeding up your job, otherwise you may have to deal with slowness and/or bad performance. Therefore, inefficiencies in your code can slow down your MapReduce jobs.

Tip

Check the current logging level of JobTracker with the following command:

hadoop daemonlog -getlevel {HadoopMachineName}:50030 org.apache.hadoop.mapred.JobTracker

Set the Hadoop log debug level for JobTracker as follows:

hadoop daemonlog -setlevel {HadoopMachineName}:50030 org.apache.hadoop.mapred.JobTracker DEBUG

The second step is to determine whether there is a problem when executing your MapReduce job. All mappers/reducers should terminate successfully when they see that the number of failed tasks on this job is zero. If this number is nonzero, basically there is something wrong with your program. If the number is low (two or three) the node may be unstable.

Note

Sometimes it's not just your job that caused the problem, but something else might be causing the problem.

Some algorithms require global state information during their processing, while MapReduce does not treat state information during its execution. MapReduce reads data iteratively and materializes intermediate results on local disk in each iteration, which require lot of I/O operations. If you need to implement such algorithms, you should consider using third-party tools such as HaLoop (http://code.google.com/p/haloop/) or Twister (http://www.iterativemapreduce.org/).

You can enhance your MapReduce job by reducing distribution time and network bandwidth required by Hadoop to distribute a heavy application. Remember that the MapReduce framework presumes a compute-light application, as the compute task needs to get transmitted across to each of the nodes of the cluster where it is scheduled to run in parallel. Transferring applications that have a very large code footprint across the nodes of the cluster would take so much time that it would completely drain the throughput benefit expected to be achieved from parallel execution of its multiple instances.

Note

Try pre-installing your computation task on all nodes; it would completely avoid the need for task distribution and the associated time loss.

Remember that MapReduce is designed to process a very large data volume as an input record. If, for some reason, your mapper fails to read an input record, it would be counterproductive to kill the task each time it fails because at the end, the result will remain the same, a mapper task that still fails.

To prevent such a situation, you should handle input error in the reader and report this error (sometimes you need to create a custom OutputFormat class) so it can be tracked by an administrator or while debugging a session.

Tip

If your application tolerates record skipping, Hadoop will provide you with a feature for skipping over records through the SkipBadRecords class:

setMapperMaxSkipRecords(Configuration conf,long maxSkipRecs)

setReducerMaxSkipGroups(Configuration conf,long maxSkipGrps)

Summary

In this chapter, we learned about MapReduce Combiners and how they help to improve the overall execution job time. Also, we covered why it is important to use compression, especially in a large data volume context.

Then we covered Java code-side optimization and learned about choosing appropriate Writable types and how to reuse these types smartly. We also learned about WritableComparator and RawComparator custom class implementation.

In the final section, we covered basic guidelines with some rules to tune your Hadoop configuration and enhance its performance.

In the next chapter, we will learn more about MapReduce optimization best practices. Keep reading!