Enhancing Map and Reduce Tasks - Optimizing Hadoop for MapReduce (2014)

Optimizing Hadoop for MapReduce (2014)

Chapter 5. Enhancing Map and Reduce Tasks

The Hadoop framework already includes several counters such as the number of bytes read and written. These counters are very helpful to learn about the framework activities and the resources used. These counters are sent by the worker nodes to the master nodes periodically.

In this chapter, for both map and reduce, we will learn how to enhance each phase, what counters to look at, and the techniques to apply in order to analyze a performance issue. Then, you will learn how to tune the correct configuration parameter with the appropriate value.

In this chapter, we will cover the following topics:

· The impact of the block size and input data

· How to deal with small and unsplittable files

· Reducing map-side spilling records

· Improving the Reduce phase

· Calculating Map and Reduce tasks' throughput

· Tuning map and reduce parameters

Enhancing map tasks

When executing a MapReduce job, the Hadoop framework will execute the job in a well-defined sequence of processing phases. Except the user-defined functions (map, reduce, and combiner), the execution time of other MapReduce phases are generic across different MapReduce jobs. The processing time mainly depends on the amount of data flowing through each phase and the performance of the underlying Hadoop cluster.

In order to enhance MapReduce performance, you first need to benchmark these different phases by running a set of different jobs with different amounts of data (per map/reduce tasks). Running these jobs is needed to collect measurements such as durations and data amount for each phase, and then analyze these measurements (for each of the phases) to derive the platform scaling functions.

To identify map-side bottlenecks, you should outline five phases of the map task execution flow. The following figure represents the map tasks' execution sequence:

Enhancing map tasks

Let us see what each stage does:

· During the Read phase, a map task typically reads a block with a fixed size (for example, 64 MB) from the Hadoop Distributed File System (HDFS). However, written data files are different and might be of arbitrary size, for example, 80 MB. In this case, to store the data, there will be two blocks: one of 64 MB and the other of 16 MB. When profiling this phase, we will measure the duration of the Read phase as well as the amount of data read by the map task.

· To profile the Map phase, you need to measure the duration of the entire map function and the number of processed records and then normalize this execution time per record. When measuring the execution time, you should check for any skewed data, which is often the result of large number of small input files or a large unsplittable file. You should compare input sizes across all map tasks (for the same job), to check whether there is any skewed data.

· During the Spill phase, the framework locally sorts the intermediate data and partitions it for the different reduce tasks, applies the combiner if available, and then writes the intermediate data to the local disk. To profile this phase, we will measure the time taken to do all of this. If you are using Combiners (discussed in Chapter 6, Optimizing MapReduce Tasks), the processing time will include their execution time.

· In the Fetch phase, we will measure the time taken by the framework to buffer map phase outputs into memory as well as the amount of generated intermediate data. In the last phase, that is the Merge phase, we will measure the time that the framework took for merging different spill files into a single spill file for each reduce task.

Input data and block size impact

Prior to the read phase, data should be located on the filesystem. Your data scheme will also affect your MapReduce job performance. To run a map task efficiently, data must be splittable, such as text files, so that MapReduce can break tasks into chunks and process each one independently. A split file should be big enough to fill one block size. The block size is important because it determines how your data is split up and every input split will be assigned to a mapper. Therefore, if you have a very large dataset but a small block size, this can result in a lot of mappers. This means mappers will finish very fast but will take time to split up and finish. So, for a large input file, this value should be high (for example, 256 MB). Default Hadoop block sizes are 64 MB (good choice for small dataset), 128 MB (for medium dataset), and 256 MB (for large dataset). A larger block size may speed disk I/O but would increase the transfer of data across the network and may also cause spilled records during the map phase.

A map task has two ways to read data from storage systems, directly from the disk (direct I/O) or by streaming the data (streaming I/O from the storage system by an interprocess communication scheme such as TCP/IP or JDBC), which is more general and can be used for reading data from both the local node and the remote node.

Hadoop distributes data to multiple nodes to balance the cluster's workload and assigns tasks to computing nodes where input data is located. This is why data locality is important and may impact the performance of the cluster. If the data is not located on the node where the mapper will process it, data will be transferred across the network, which will increase network traffic. Direct I/O is more efficient if the map function reads data from the local node (data-local maps). The Streaming I/O is the only choice if a map function is not a data-local map.

Dealing with small and unsplittable files

Your business may require that you process large and small, binary or compressed files that HDFS can't split. Binary and compressed files are by nature not block-based. Therefore, this may affect the performance of your MapReduce jobs due to the loss of data locality.

Hadoop was not designed to deal with small files, and the HDFS is designed to store and process large datasets (terabytes). However, storing a large number of small files in HDFS is inefficient. The problem with a large number of small files is there will be a lot of parallel tasks to process these files, and too much parallelism can consume more resources than necessary, which will impact the runtime of your job. Also, in the case of large number of small files stored in the system, their metadata occupies a large portion of the system, which is limited by the NameNode physical memory capacities.

A split file is considered to be small if its size is less than the HDFS block size (by default 64 MB), and big if its size exceeds the block size. In order to inspect the size of your input files, browse the Hadoop DFS Home at http://machinename:50070/dfshealth.jsp and click on Browse the filesystem. The following screenshot shows a DFS directory that contains small files; each file has a size of 1 MB and a block size of 256 MB:

Dealing with small and unsplittable files

The easiest way to solve the problem of HDFS small files with Hadoop is to work to package them into a larger file. In this case, you store less files in the NameNode memory and improve data interchange, and all your small files will be stored on the local disk in a single file. In order to pack small files when using Hadoop, you can choose any of the following alternatives:

· Create container files using Avro to serialize data. Avro is an Apache open source project (http://avro.apache.org/) that provides data serialization for Hadoop and data exchange services, which can be used together or independently.

· Use Hadoop Archives (HAR files). The HAR files are special format archives and work by building a layered filesystem on top of HDFS so that the original files can be accessed in parallel, in a transparent manner, and efficiently by Hadoop without expanding the files. To create a HAR file archive, you can use the hadoop archive command and use a har:// URL to access the files.

· Use a sequence file (SequenceFile) to store small files into one larger single file. SequenceFile is structured the same way as a key/value pair in which you define the filename as a key and its contents as the value. Another advantage of using sequence file is that such files are splittable and allow block compression.

To determine whether a file will fit into a block, you need to calculate the average number of input bytes for each map task and compare it to the block size. In case you find the average larger than the block size, then you need to investigate why Hadoop didn't split your file. This may be because your file is not compliant with the TextInputFormat or LzoTextInputFormat interface.

Note

Hadoop compression codecs such as Deflate, gzip, lzo, and Snappy are not splittable.

Reducing spilled records during the Map phase

Map functions may write a large amount of data to the local filesystem during the map phase. While Map tasks are running, they generate intermediate data output which is stored into a memory buffer that is set to 100 MB by default (io.sort.mb). This buffer is a chunk of reserved memory that is part of the Map JVM heap space. As soon as a threshold of occupancy is reached (io.sort.spill.percent), the content of the buffer is flushed to the local disk, this is what we call spill. To store the spilled records' metadata (length of 16-bytes for each record), the Hadoop framework allocates 0.05 percent of the memory allocated by io.sort.mb (parameter, io.sort.record.percent) and hence 5 MB is allocated to the metadata and 95 MB is left for buffer use. Take a look at the following diagram:

Reducing spilled records during the Map phase

Each of the parameters shown in the preceding diagram are described in the following table:

Parameter

Default value

Tuning recommendation

io.sort.mb

100

This parameter indicates the amount of memory in megabytes allocated to sort and store the map task's output. To set this value, it is good practice to not exceed 70 to 75 percent of one-fifth of the available memory on the DataNode.

io.sort.factor

10

This sort factor parameter indicates the number of files to merge together in a single pass. Depending on your DataNode memory size, this value should be set to one-tenth of the amount of memory defined by io.sort.mb.

io.sort.record.percent

0.05

This parameter determines the percentage of io.sort.mb used to store map output's metadata. We recommend to keep the default value for this parameter.

io.sort.spill.percent

0.80

This parameter determines the percentage of the map output buffer after which the buffer will be spilled to the disk. We recommend that you keep the default value for this parameter. To nearly use the full buffer capacity, you can set the value up to 0.99.

tasktrakker.http.threads

40

This parameter determines the number of threads that serve mappers' output to reducers. This value is set per tracker and should not be set by individual jobs. This value may be increased on a very large cluster.

Performance issues and reading overhead may occur when spilling records to the disk multiple times. Profiling these MapReduce data flow steps means to detect whether your map task is performing additional spills. To determine whether there are additional spills, you should compare Map output records and Spilled Records Hadoop counters, and if the number of Spilled Records is greater than Map output records, it is certain that additional spilling is occurring. The following screenshot reports Hadoop counters for a MapReduce job that has spilled records:

Reducing spilled records during the Map phase

To enhance the framework's performance at this stage and eliminate the additional spills to disk, you need to allocate the memory buffer with an accurate value and set the io.sort.spill.percent to 0.99 to use nearly the full buffer capacity.

To determine the memory space required by the buffer, you should calculate the total size of the buffer (records + metadata). To calculate io.sort.record.percent and io.sort.mb, you need to calculate the following values (let's assume that we use reported counters from the previous screenshot):

· Record length (RL) = map output bytes / map output records, that is, 671,089,000 / 6,710,890 = 100 bytes

· Spilled Records size (RS) = Spilled Records number * Record length, that is, 1,342,178 * 100 = 134,217,800 bytes = 128 MB

· Metadata size (MS) = metadata length * Spilled Records number, that is, 1,342,178 * 16 = 21,474,848 bytes = 20.48 MB

Once you have calculated the values of RL, RS, and MS, you may now calculate the total size of the buffer (records + metadata) as follows:

· io.sort.record.percent = metadata length / (metadata length + Record length), that is, 16 / (16 + 100) = 0.138

· io.sort.mb = Metadata size + Spilled Records size, that is, 128 + 20.48 ≈ 149 MB

Calculating map tasks' throughput

During the Map phase, a map task may be slowed by small files, which means Hadoop is spending a lot of time to start and stop tasks. When working with large unsplittable files, Hadoop spends I/O time reading data from the other node. Also poor read/write disk operations will affect Hadoop's MapReduce performance.

To determine whether your map tasks run slowly and have a low throughput, you need to calculate this throughput using the Hadoop I/O counters based on the file size written (or read) by the individual map tasks and the elapsed time to process the job. Then, if the calculated throughput is close to the local I/O throughput, you can consider it as the optimal throughput, otherwise, some other factor is affecting your map tasks' performance.

Let's assume you have a MapReduce job using N map tasks; the throughput is calculated in bytes/sec (bytes per second) as follows:

Throughput (N map tasks) = sum (each map input size in bytes) / sum (each map execution time in seconds).

The following screenshot shows the Hadoop counter you need to calculate the map task's throughput, which you can find on the task stats history:

Calculating map tasks' throughput

The reported map execution time is shown in the following screenshot:

Calculating map tasks' throughput

Therefore, by using the counter's values for this single sample map task, the throughput is, 67,108,900 / 24 = 2,796,204.16 ≈ 2,66 MB/sec.

It is recommended to determine map task's performance throughput based on the average map tasks' throughput and not on one map's throughput, unless your job uses only one map task. You can calculate the average map tasks' throughput using the formulaaverage throughput = sum (each map input size in bytes / time in seconds) / number of map tasks.

It is also interesting to calculate the concurrent average throughput in order to determine your cluster capabilities. Let's say you have a cluster with a capacity of 100 map slots, 5.231 MB/s I/O throughput, 4.863 MB/s average I/O throughput, and you want to process 5000 files (256 MB each). In this case, because of the cluster capacity, this job will be processed in 50 MapReduce waves (5000 / 100 = 50). To calculate the concurrent throughput, multiply the throughput rate by the minimum of the number of files (5000 in this example) and the number of available map slots in your cluster (100 in this example).

In our example, you can calculate the concurrent throughput as 5.231 * 100 = 523.1 MB/s. The average concurrent throughput can be calculated as 4.863 * 100 = 486.3 MB/s.

Calculating the average concurrent throughput will help you to estimate the time required to process your input files using your Hadoop cluster. Therefore, in our example, 5000 files will be processed in (5000 files * 256 MB each) / 486.3 MB/s average throughput = 2632.12 sec ≈ 44 min.

Enhancing reduce tasks

Reduce task processing consists of a sequence of three phases. Only the execution of the user-defined reduce function is custom, and its duration depends on the amount of data flowing through each phase and the performance of the underlying Hadoop cluster. Profiling each of these phases will help you to identify potential bottlenecks and low speeds of data processing. The following figure shows the three major phases of Reduce tasks:

Enhancing reduce tasks

Let's see each phase in some detail:

· Profiling the Shuffle phase implies that you need to measure the time taken to transfer the intermediate data from map tasks to the reduce tasks and then merge and sort them together. In the shuffle phase, the intermediate data generated by the map phase is fetched. The processing time of this phase significantly depends on Hadoop configuration parameters and the amount of intermediate data that is destined for the reduce task.

· In the Reduce phase, each reduce task is assigned a partition of the map output intermediate data with a fixed key range; so the reduce task must fetch the content of this partition from every map task's output in the cluster. The execution time is the time taken to apply the user-supplied reduce function on the input key and all the values corresponding to it. To profile the Reduce phase, measure the execution time, which also depends on the size of the input data.

· Profiling the Write phase, which is the last phase, means measuring how long Hadoop takes to write the reduce output to HDFS.

Calculating reduce tasks' throughput

On the reduce-side, reduction in speed may be caused by a bad or nonoptimized reduce function user code, hardware issue, or a misconfiguration of the Hadoop framework. To determine the throughput of your reduce tasks, you can calculate this throughput using the Hadoop I/O counters (as you did for the map tasks' throughput).

For a given MapReduce job using N reduce tasks, the throughput is calculated in bytes/sec (bytes per second) using the formula throughput (N reduce tasks) = sum (each reduce input shuffle in bytes) / sum (each reduce execution time in seconds).

The following screenshot shows the Hadoop counter you need to calculate the reduce tasks' throughput:

Calculating reduce tasks' throughput

The following screenshot shows the reduce function's execution time:

Calculating reduce tasks' throughput

The Reduce phase's total execution time is the aggregation of the execution time of three steps: the Shuffle execution time, the Sort time, and the Reduce function execution time. The execution time for each step is reported by Hadoop logs, as shown in the previous screenshot.

It may also be interesting to calculate the Shuffle and Sort steps' throughputs. You can calculate the Shuffle throughput by dividing the Shuffle bytes by the Shuffle execution time (in seconds). Also, to calculate the Sort step throughput, you need to divide the Shuffle bytes by the Sort execution time (in seconds). The Shuffle throughput is calculated as Shuffle bytes / Shuffle time and Sort throughput as Shuffle bytes / Sort time.

As you can see, these formulas may lead to a division by zero error if the Shuffle or Sort time is equal to zero. The Shuffle and Sort throughputs are inversely proportional to the Shuffle and Sort time. This means, the closer the Shuffle or Sort time is to zero, the bigger will be the Shuffle or Sort throughput, which means the throughput is maximum at lowest Shuffle or Sort time values.

Improving Reduce execution phase

Once map tasks have processed data, the map output data shuffled at different times needs to be merged to a single reduce input file and sorted by a key before a reduce task can use it. The size of a map output depends on the size of its input dataset. The easiest way to enhance performance in the Shuffle and Sort phases is to reduce the amount of data to be sorted and merged. This is typically done by using combiners, data compression, and/or data filtering. (Using combiners and implementing compression is discussed in Chapter 6, Optimizing MapReduce Tasks.)

Local disk issues and network problems are common sources of performance issues in the Shuffle and Sort phases as the MapReduce framework reads the spilled local inputs and feeds them to the reduce code.

Within the Reduce phase, a high volume of traffic network is usually observed due to the data transfer between Map and Reduce tasks. Hadoop has several configuration parameters that you can tune in order to enhance its performance in this phase. The following table shows the common parameters tuned to enhance the Reduce phase:

Parameter

Default value

Tuning recommendation

mapred.reduce.parallel.copies

5

This parameter controls the number of threads in the Reducer task.

io.sort.factor

10

This sort factor parameter indicates the number of files to merge together in a single pass. Depending on your DataNode memory size, this value should be set to one-tenth of the amount of memory defined byio.sort.mb.

mapred.job.reduce.input.buffer.percent

0.0

This parameter determines the percentage of memory—relative to the maximum memory heap size—to retain map outputs during the reduce phase.

mapred.job.shuffle.input.buffer.percent

0.7

This parameter determines the percentage of memory to be allocated from the maximum memory heap size to storing map outputs during the shuffle phase.

By setting mapred.job.reduce.input.buffer.percent value to 0.8, the buffer will be used up to 80 percent, which will keep the reducer input data in memory. Because the default value 0.0 means that map outputs are merged into the local disk rather than in memory.

It is also recommended to maximize the memory allocated to store map output during the Shuffle phase. Therefore, you can increase the mapred.job.shuffle.input.buffer.percent value up to 0.9 to use 90 percent of the memory heap size rather than 70 percent of the default size.

Tuning map and reduce parameters

Picking the right amount of tasks for a job can have a huge impact on Hadoop's performance. In Chapter 4, Identifying Resource Weaknesses, you learned how to configure the number of mappers and reducers correctly. But sizing the number of mappers and reducers correctly is not enough to get the maximum performance of a MapReduce job. The optimum occurs when every machine in the cluster has something to do at any given time when a job is executed. Remember that Hadoop framework has more than 180 parameters and most of them should not keep their default settings.

In this section, we will present other techniques to calculate your mappers' and reducers' numbers. It may be more productive to try more than one optimization method, because we aim to find a particular configuration for a given job that uses all available resources on your cluster. The outcome of this change is to enable the user to run as many mappers and reducers in parallel as possible to fully utilize the available resources.

The theoretical upper bound limit of your mappers can be calculated by dividing the input file size by the block size. On the other hand, your input file is processed by machines with a fixed number of CPU cores (#mappers = number of physical cores - reserved core * (0.95 to 1.75)). This means to process a 10 GB file on a cluster of three nodes each with eight CPU cores and a 256 MB block size, the mappers optimal number is somewhere between 37 and 40. The upper limit of mappers is 10 GB / 0.25 GB = 40. The mappers optimal number can be calculated as follows:

· #mappers based on CPU core number = (8 cores - 1 reserved cores) * 1.75 = 12.25

· #cluster mappers capacity = 12.25 * 3 = 37.5

There are different techniques that you can use to determine the number of mappers and reducers accurately. All these techniques are based on calculations and tuned or adjusted according to real-world experiences. You should not focus on just one technique, but try each one of these techniques in your own environment to find the one that lets your cluster run optimally.

In Chapter 3, Detecting System Bottlenecks, we suggest to set the reducer's number between 50 percent and 99 percent of the cluster capacity so that all the reducers finish in one wave. This technique suggests that you calculate the reducers number using this formula: (0.5 to 0.95) * nodes number * reducers slots number.

In Chapter 4, Identifying Resource Weaknesses, we suggest setting the number of reducers' slots the same as the mappers' slots or at least to two-thirds of the mappers. Experience shows that this calculation technique is an easy and fast way to set the reducers' numbers on small cluster size or development environment.

To determine the lower bounds of your reducer's number, you should divide the CPU cores' number per node by two ((cores per node) / 2). To determine the upper bound, you multiply the CPU cores' number by two, (2 * (cores per node)). You can also use the two-thirds mappers' technique to specify the reducers' number in this range.

In this section, we will suggest two formulas to determine the mappers' and reducers' numbers based on the number of your cluster nodes and CPU cores. You can use these formulas as a starting point to calculate these numbers and then fine-tune them in your environment to get the optimal values as follows:

· Use the cluster nodes' number to calculate the number of mappers and reducers using the formula number of reducers = (0.95 to 1.75) * (number of nodes * mapred.reduce.parallel.copies). Here, 0.95 to 1.75 is the CPU hyperthreading factor andmapred.reduce.parallel.copies is the configuration parameter that determines the maximum number of reducers that can run in parallel.

· Use the CPU cores' number to calculate the number of mappers and reducers using the formula number of reducers = (0.95 to 1.75) * (number of CPU cores - 1).

Let's return to our cluster example (three nodes, each node has one CPU * four cores, 4 GB RAM, and 40 GB HDD space, which we used to create the baseline in Chapter 3, Detecting System Bottlenecks). The following table summarizes the different calculation techniques:

Calculation technique

Formula to use

#mappers

#reducers

Set the reducers' number between 50 percent and 99 percent of the cluster

#mappers = CPU cores - 1 * 1.75

#reducers = (0.5 to 0.95) * nodes number * reducers' slots number

4 - 1 * 1.75 = 5

0.95 * 3 * 5 = 14.25 ≈ 14

Determine reducers' upper and lower bounds

#mappers = CPU cores - 1 * 1.75

#reducers LB = (cores per node) / 2

#reducers UB = 2 * (cores per node)

If you apply the 2/3 mappers technique: =5*2/3 = 3.33 ≈ 3 (which is in the range)

5

LB = 4/2 = 2

UB = 2*4= 8

Based on node numbers

#mappers = CPU cores - 1 * 1.75

#reducers = 1.75 * (number of nodes * mapred.reduce.parallel.copies)

5

1.75 * (3 * 5) = 26.25 ≈ 25

Based on CPU cores

#mappers = CPU cores - 1 * 1.75

#reducers = (0.95 to 1.75) * (number of CPU cores -1)

5

1.75 * 3 = 5.25 ≈ 5

We applied the third and fourth technique's calculation results to the test cluster environment and the reported result in Tuned 2 and Tuned 3 columns in the following table (Baseline and Tuned 1 columns are reported from Chapter 3, Detecting System Bottlenecks). In comparison to the Tuned 1 column, we will use a larger block size (256 MB, 128 MB in the previous iteration) and allocate more memory (-Xmx550m) using the mapred.child.java.opts parameter:

Hadoop parameter

Baseline

Tuned 1

Tuned 2

Tuned 2

dfs.replication

3

2

2

2

dfs.block.size

6,7108,864

134,217,728

268,435,456

268,435,456

dfs.namenode.handler.count

10

20

20

20

dfs.datanode.handler.count

3

5

5

5

io.sort.factor

10

35

35

35

io.sort.mb

100

350

350

350

mapred.tasktracker.map.tasks.maximum

2

5

5

5

mapred.map.tasks

2

2

2

2

mapred.reduce.tasks

1

8

26

5

mapred.tasktracker.reduce.tasks.maximum

2

5

5

5

mapred.reduce.parallel.copies

5

5

5

5

mapred.job.reduce.input.buffer.percent

0

0

0

0

mapred.child.java.opts

-Xmx200m

-Xmx500m

-Xmx550m

-Xmx550m

Input data size

10 GB

10 GB

10 GB

10 GB

Cluster's nodes number

3

3

3

3

Job execution time (sec)

243

185

169

190

Improvement over Baseline (%)

23.86%

30.45%

21.81%

These jobs are all done on a test environment using autogenerated data. You may get different results when trying these techniques on your cluster.

Summary

In this chapter, we learned about map-side and reduce-side tasks' enhancement and introduced some techniques that may help you to improve the performance of your MapReduce job. We learned how important the impact of the block size is and how to identify slow map-side performance due to small and unsplittable files. Also, we learned about spilling files and how to eliminate them by allocating an accurate amount of memory buffer.

Then, we moved ahead and learned how to identify a low performance job within the Shuffle and Merge steps during the Reduce phase. In the last section, we covered different techniques to calculate mappers' and reducers' numbers to tune your MapReduce configuration and enhance its performance.

In the next chapter, we will learn more about the optimization of MapReduce task and take a look at how combiners and intermediate data compression will enhance the MapReduce job performance. Keep reading!