Hadoop Essentials – Configurations, Unit Tests, and Other APIs - Hadoop MapReduce v2 Cookbook, Second Edition (2015)

Hadoop MapReduce v2 Cookbook, Second Edition (2015)

Chapter 3. Hadoop Essentials – Configurations, Unit Tests, and Other APIs

Chapter 3. Hadoop Essentials – Configurations, Unit Tests, and Other APIs

In this chapter, we will cover:

· Optimizing Hadoop YARN and MapReduce configurations for cluster deployments

· Shared user Hadoop clusters – using Fair and Capacity schedulers

· Setting classpath precedence to user-provided JARs

· Speculative execution of straggling tasks

· Unit testing Hadoop MapReduce applications using MRUnit

· Integration testing Hadoop MapReduce applications using MiniYarnCluster

· Adding a new DataNode

· Decommissioning DataNodes

· Using multiple disks/volumes and limiting HDFS disk usage

· Setting the HDFS block size

· Setting the file replication factor

· Using the HDFS Java API

Introduction

This chapter describes how to perform advanced administration steps in your Hadoop cluster, how to develop unit and integration tests for Hadoop MapReduce programs and how to use the Java API of HDFS. This chapter assumes that you have followed the first chapter and have installed Hadoop in a clustered or pseudo-distributed setup.

Note

Sample code and data

The sample code files for this book are available in GitHub at https://github.com/thilg/hcb-v2. The chapter3 folder of the code repository contains the sample source code files for this chapter.

Sample codes can be compiled and built by issuing the gradle build command in the chapter3 folder of the code repository. Project files for Eclipse IDE can be generated by running the gradle eclipse command in the main folder of the code repository. Project files for the IntelliJ IDEA IDE can be generated by running the gradle idea command in the main folder of the code repository.

Optimizing Hadoop YARN and MapReduce configurations for cluster deployments

In this recipe, we explore some of the important configuration options of Hadoop YARN and Hadoop MapReduce. Commercial Hadoop distributions typically provide a GUI-based approach to specify Hadoop configurations.

YARN allocates resource containers to the applications based on the resource requests made by the applications and the available resource capacity of the cluster. A resource request by an application would consist of the number of containers required and the resource requirement of each container. Currently, most container resource requirements are specified using the amount of memory. Hence, our focus in this recipe will be mainly on configuring the memory allocation of a YARN cluster.

Getting ready

Set up a Hadoop cluster by following the recipes in the first chapter.

How to do it...

The following instructions will show you how to configure the memory allocation in a YARN cluster. The number of tasks per node is derived using this configuration:

1. The following property specifies the amount of memory (RAM) that can be used by YARN containers in a worker node. It's advisable to set this slightly less than the amount of physical RAM present in the node, leaving some memory for the OS and other non-Hadoop processes. Add or modify the following lines in the yarn-site.xml file:

2. <property>

3. <name>yarn.nodemanager.resource.memory-mb</name>

4. <value>100240</value>

</property>

5. The following property specifies the minimum amount of memory (RAM) that can be allocated to a YARN container in a worker node. Add or modify the following lines in the yarn-site.xml file to configure this property.

If we assume that all the YARN resource-requests request containers with only the minimum amount of memory, the maximum number of concurrent resource containers that can be executed in a node equals (YARN memory per node specified in step 1)/(YARN minimum allocation configured below). Based on this relationship, we can use the value of the following property to achieve the desired number of resource containers per node.

The number of resource containers per node is recommended to be less than or equal to the minimum of (2*number CPU cores) or (2* number of disks).

<property>

<name>yarn.scheduler.minimum-allocation-mb</name>

<value>3072</value>

</property>

6. Restart the YARN ResourceManager and NodeManager services by running sbin/stop-yarn.sh and sbin/start-yarn.sh from the HADOOP_HOME directory.

The following instructions will show you how to configure the memory requirements of the MapReduce applications.

1. The following properties define the maximum amount of memory (RAM) that will be available to each Map and Reduce task. These memory values will be used when MapReduce applications request resources from YARN for Map and Reduce task containers. Add the following lines to the mapred-site.xml file:

2. <property>

3. <name>mapreduce.map.memory.mb</name>

4. <value>3072</value>

5. </property>

6. <property>

7. <name>mapreduce.reduce.memory.mb</name>

8. <value>6144</value>

</property>

9. The following properties define the JVM heap size of the Map and Reduce tasks respectively. Set these values to be slightly less than the corresponding values in step 4, so that they won't exceed the resource limits of the YARN containers. Add the following lines to the mapred-site.xml file:

10.<property>

11. <name>mapreduce.map.java.opts</name>

12. <value>-Xmx2560m</value>

13.</property>

14.<property>

15. <name>mapreduce.reduce.java.opts</name>

16. <value>-Xmx5120m</value>

</property>

How it works...

We can control Hadoop configurations through the following four configuration files. Hadoop reloads the configurations from these configuration files after a cluster restart:

· core-site.xml: Contains the configurations common to the whole Hadoop distribution

· hdfs-site.xml: Contains configurations for HDFS

· mapred-site.xml: Contains configurations for MapReduce

· yarn-site.xml: Contains configurations for the YARN ResourceManager and NodeManager processes

Each configuration file has name-value pairs expressed in XML format, defining the configurations of different aspects of Hadoop. The following is an example of a property in a configuration file. The <configuration> tag is the top-level parent XML container and<property> tags, which define individual properties, are specified as child tags inside the <configuration> tag:

<configuration>

<property>

<name>mapreduce.reduce.shuffle.parallelcopies</name>

<value>20</value>

</property>

...

</configuration>

Some configurations can be configured on a per-job basis using the job.getConfiguration().set(name, value) method from the Hadoop MapReduce job driver code.

There's more...

There are many similar important configuration properties defined in Hadoop. The following are some of them:

conf/core-site.xml

Name

Default value

Description

fs.inmemory.size.mb

200

Amount of memory allocated to the in-memory filesystem that is used to merge map outputs at reducers in MBs

io.file.buffer.size

131072

Size of the read/write buffer used by sequence files

conf/mapred-site.xml

Name

Default value

Description

mapreduce.reduce.shuffle.parallelcopies

20

Maximum number of parallel copies the reduce step will execute to fetch output from many parallel jobs

mapreduce.task.io.sort.factor

50

Maximum number of streams merged while sorting files

mapreduce.task.io.sort.mb

200

Memory limit while sorting data in MBs

conf/hdfs-site.xml

Name

Default value

Description

dfs.blocksize

134217728

HDFS block size

dfs.namenode.handler.count

200

Number of server threads to handle RPC calls in NameNodes

Note

You can find a list of deprecated properties in the latest version of Hadoop and the new replacement properties for them at http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html.

The following documents provide the list of properties, their default values, and the descriptions of each of the configuration files mentioned earlier:

· Common configuration: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/core-default.xml

· HDFS configuration: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml

· YARN configuration: http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-common/yarn-default.xml

· MapReduce configuration: http://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml

Shared user Hadoop clusters – using Fair and Capacity schedulers

The Hadoop YARN scheduler is responsible for assigning resources to the applications submitted by users. In Hadoop YARN, these can be any YARN application in addition to MapReduce applications. Currently, the default YARN resource allocation is based on the memory requirements of the application, while resource allocation based on other resources such as CPU can be configured additionally.

Hadoop YARN supports a pluggable scheduling framework, where the cluster administrator has the choice of selecting an appropriate scheduler for the cluster. By default, YARN supports a First in First out (FIFO) scheduler, which executes jobs in the same order as they arrive using a queue of jobs. However, FIFO scheduling might not be the best option for large multi-user Hadoop deployments, where the cluster resources have to be shared across different users and different applications to ensure maximum productivity from the cluster. Please note that commercial Hadoop distributions may use a different scheduler such as Fair scheduler (for example, Cloudera CDH) or Capacity scheduler (for example, Hortonworks HDP) as the default YARN scheduler.

In addition to the default FIFO scheduler, YARN contains the following two schedulers (if required, it is possible for you to write your own scheduler as well):

· Fair scheduler: The Fair scheduler allows all jobs to receive an equal share of resources. The resources are assigned to newly submitted jobs as and when the resources become available until all submitted and running jobs have the same amount of resources. The Fair scheduler ensures that short jobs are completed at a realistic speed, while not starving long-running larger jobs for longer periods. With the Fair scheduler, it's also possible to define multiple queues and queue hierarchies with guaranteed minimum resources to each queue, where the jobs in a particular queue share the resources equally. Resources allocated to any empty queues get divided among the queues with active jobs. The Fair scheduler also allows us to set job priorities, which are used to calculate the proportion of resource distribution inside a queue.

· Capacity scheduler: The Capacity scheduler allows a large cluster to be shared across multiple organizational entities while ensuring guaranteed capacity for each entity and that no single user or job holds all the resources. This allows organizations to achieve economies of scale by maintaining a centralized Hadoop cluster shared between various entities. In order to achieve this, the Capacity scheduler defines queues and queue hierarchies, with each queue having a guaranteed capacity. The Capacity scheduler allows the jobs to use the excess resources (if any) from the other queues.

How to do it...

This recipe describes how to change the scheduler in Hadoop:

1. Shut down the Hadoop cluster.

2. Add the following to the yarn-site.xml file:

3. <property>

4. <name>yarn.resourcemanager.scheduler.class</name>

5. <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>

</property>

6. Restart the Hadoop cluster.

7. Verify that the new scheduler has been applied by going to http://<master-noe>:8088/cluster/scheduler in your installation.

How it works...

When you follow the aforementioned steps, Hadoop will load the new scheduler settings when it is started. The Fair scheduler shares an equal amount of resources between users unless it has been configured otherwise.

We can provide an XML formatted allocation file, defining the queues for the Fair scheduler, using the yarn.scheduler.fair.allocation.file property in the yarn-site.xml file.

More details about the Fair scheduler and its configurations can be found at https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html.

There's more...

You can enable the Capacity scheduler by adding the following to the yarn-site.xml file and restarting the cluster:

<property>

<name>yarn.resourcemanager.scheduler.class</name>

<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>

</property>

The Capacity scheduler can be configured using the capacity-scheduler.xml file in the Hadoop configuration directory of the ResourceManager node. Issue the following command in the YARN ResourceManager node to load the configuration and to refresh the queues:

$ yarn rmadmin -refreshQueues

More details about the Capacity scheduler and its configurations can be found at http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html.

Setting classpath precedence to user-provided JARs

While developing Hadoop MapReduce applications, you may encounter scenarios where your MapReduce application requires a newer version of an auxiliary library that is already included in Hadoop. By default, Hadoop gives classpath precedence to the libraries included with Hadoop, which can result in conflicts with the version of the library you provide with your applications. This recipe shows you how to configure Hadoop to give classpath precedence to user-provided libraries.

How to do it...

The following steps show you how to add external libraries to the Hadoop task classpath and how to provide precedence to user-supplied JARs:

1. Set the following property in the driver program of your MapReduce computation:

job.getConfiguration().set("mapreduce.job.user.classpath.first","true");

2. Use the –libjars option in the Hadoop command to provide your libraries, as follows:

3. $hadoop jar hcb-c3-samples.jar \

4. chapter3.WordCountWithTools \

5. –libjars guava-15.0.jar \

6. InDir OutDir …

How it works...

Hadoop will copy the JARs specified by –libjars in to the Hadoop DistributedCache and they will be made available to the classpath of all the tasks belonging to this particular job. When mapreduce.user.classpath.first is set, the user-supplied JARs will be appended to the classpath before the default Hadoop JARs and Hadoop dependencies.

Speculative execution of straggling tasks

One of the main advantages of using Hadoop MapReduce is the framework-managed fault tolerance. When performing a large-scale distributed computation, parts of the computation can fail due to external causes such as network failures, disk failures, and node failures. When Hadoop detects an unresponsive task or a failed task, Hadoop will re-execute those tasks in a new node.

A Hadoop cluster may consist of heterogeneous nodes, and as a result there can be very slow nodes as well as fast nodes. Potentially, a few slow nodes and the tasks executing on those nodes can dominate the execution time of a computation. Hadoop introduces speculative execution optimization to avoid these slow-running tasks, which are called stragglers.

When most of the Map (or Reduce) tasks of a computation are completed, the Hadoop speculative execution feature will schedule duplicate executions of the remaining slow tasks in available alternate nodes. The slowness of a task is decided relative to the running time taken by the other tasks of the same computation. From a set of duplicate tasks, Hadoop will choose the results from the first completed task and will kill any other duplicate executions of that task.

How to do it...

By default, speculative executions are enabled in Hadoop for both Map and Reduce tasks. If such duplicate executions are undesirable for your computations for some reason, you can disable (or enable) speculative executions as follows:

1. Run the WordCount sample passing the following option as an argument:

2. $ hadoop jar hcb-c32-samples.jar chapter3.WordCountWithTools \

3. –Dmapreduce.map.speculative=false \

4. –Dmapreduce.reduce.speculative=false \

5. /data/input1 /data/output1

6. However, the preceding command only works if the job implements the org.apache.hadoop.util.Tools interface. Otherwise, set these properties in the MapReduce driver program using the following methods:

· For the whole job, use job.setSpeculativeExecution(boolean specExec)

· For Map tasks, use job.setMapSpeculativeExecution(boolean specExec)

· For Reduce tasks, use Job.setReduceSpeculativeExecution(boolean specExec)

There's more...

You can configure the maximum number of retry attempts for a task using the properties, mapreduce.map.maxattempts and mapreduce.reduce.maxattempts, for Map and Reduce tasks, respectively. Hadoop declares a task as a failure after it exceeds the given number of retries. You can also use the JobConf.setMaxMapAttempts() and JobConf.setMaxReduceAttempts() functions to configure these properties. The default value for these properties is 4.

Unit testing Hadoop MapReduce applications using MRUnit

MRUnit is a JUnit-based Java library that allows us to unit test Hadoop MapReduce programs. This makes it easy to develop as well as to maintain Hadoop MapReduce code bases. MRUnit supports testing Mappers and Reducers separately as well as testing MapReduce computations as a whole. In this recipe, we'll be exploring all three testing scenarios. The source code for the test program used in this recipe is available in the chapter3\test\chapter3\WordCountWithToolsTest.java file in the Git repository.

Getting ready

We use Gradle as the build tool for our sample code base. If you haven't already done so, please install Gradle by following the instructions given in the introduction section of Chapter 1, Getting Started with Hadoop v2.

How to do it...

The following steps show you how to perform unit testing of a Mapper using MRUnit:

1. In the setUp method of the test class, initialize an MRUnit MapDriver instance with the Mapper class you want to test. In this example, we are going to test the Mapper of the WordCount MapReduce application we discussed in earlier recipes:

2. public class WordCountWithToolsTest {

3.

4. MapDriver<Object, Text, Text, IntWritable> mapDriver;

5.

6. @Before

7. public void setUp() {

8. WordCountWithTools.TokenizerMapper mapper = new WordCountWithTools.TokenizerMapper();

9. mapDriver = MapDriver.newMapDriver(mapper);

10. }

11.……

}

12. Write a test function to test the Mapper logic. Provide the test input to the Mapper using the MapDriver.withInput method. Then, provide the expected result of the Mapper execution using the MapDriver.withOutput method. Now, invoke the test using theMapDriver.runTest method. The MapDriver.withAll and MapDriver.withAllOutput methods allow us to provide a list of test inputs and a list of expected outputs, rather than adding them individually.

13.@Test

14. public void testWordCountMapper() throws IOException {

15. IntWritable inKey = new IntWritable(0);

16. mapDriver.withInput(inKey, new Text("Test Quick"));

17. ….

18. mapDriver.withOutput(new Text("Test"),new IntWritable(1));

19. mapDriver.withOutput(new Text("Quick"),new IntWritable(1));

20. …

21. mapDriver.runTest();

}

The following step shows you how to perform unit testing of a Reducer using MRUnit.

22. Similar to step 1 and 2, initialize a ReduceDriver by providing the Reducer class under test and then configure the ReduceDriver with the test input and the expected output. The input to the reduce function should conform to a key with a list of values. Also, in this test, we use the ReduceDriver.withAllOutput method to provide a list of expected outputs.

23.public class WordCountWithToolsTest {

24. ReduceDriver<Text,IntWritable,Text,IntWritable> reduceDriver;

25.

26.@Before

27. public void setUp() {

28. WordCountWithTools.IntSumReducer reducer = new WordCountWithTools.IntSumReducer();

29. reduceDriver = ReduceDriver.newReduceDriver(reducer);

30. }

31.

32.@Test

33. public void testWordCountReduce() throws IOException {

34. ArrayList<IntWritable> reduceInList = new ArrayList<IntWritable>();

35. reduceInList.add(new IntWritable(1));

36. reduceInList.add(new IntWritable(2));

37.

38. reduceDriver.withInput(new Text("Quick"), reduceInList);

39. ...

40. ArrayList<Pair<Text, IntWritable>> reduceOutList = new ArrayList<Pair<Text,IntWritable>>();

41. reduceOutList.add(new Pair<Text, IntWritable> (new Text("Quick"),new IntWritable(3)));

42. ...

43. reduceDriver.withAllOutput(reduceOutList);

44. reduceDriver.runTest();

45. }

}

The following steps show you how to perform unit testing on a whole MapReduce computation using MRUnit.

46. In this step, initialize a MapReduceDriver by providing the Mapper class and Reducer class of the MapReduce program that you want to test. Then, configure the MapReduceDriver with the test input data and the expected output data. When executed, this test will execute the MapReduce execution flow starting from the Map input stage to the Reduce output stage. It's possible to provide a combiner implementation to this test as well.

47.public class WordCountWithToolsTest {

48. ……

49. MapReduceDriver<Object, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;

50.

51.@Before

52. public void setUp() {

53. ....

54. mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);

55. }

56.

57.@Test

58. public void testWordCountMapReduce() throws IOException {

59.

60. IntWritable inKey = new IntWritable(0);

61. mapReduceDriver.withInput(inKey, new Text("Test Quick"));

62. ……

63. ArrayList<Pair<Text, IntWritable>> reduceOutList = new ArrayList<Pair<Text,IntWritable>>();

64. reduceOutList.add(new Pair<Text, IntWritable>(new Text("Quick"),new IntWritable(2)));

65. ……

66. mapReduceDriver.withAllOutput(reduceOutList);

67. mapReduceDriver.runTest();

68. }

}

69. The Gradle build script (or any other Java build mechanism) can be configured to execute these unit tests with every build. We can add the MRUnit dependency to the Gradle build (chapter3/build.gradle) file as follows:

70.dependencies {

71. testCompile group: 'org.apache.mrunit', name: 'mrunit', version: '1.1.+',classifier: 'hadoop2'

72.……

}

73. Use the following Gradle command to execute only the WordCountWithToolsTest unit test. This command executes any test class that matches the pattern **/WordCountWith*.class:

74.$ gradle –Dtest.single=WordCountWith test

75.:chapter3:compileJava UP-TO-DATE

76.:chapter3:processResources UP-TO-DATE

77.:chapter3:classes UP-TO-DATE

78.:chapter3:compileTestJava UP-TO-DATE

79.:chapter3:processTestResources UP-TO-DATE

80.:chapter3:testClasses UP-TO-DATE

81.:chapter3:test

82.BUILD SUCCESSFUL

83.Total time: 27.193 secs

84. You can also execute MRUnit-based unit tests in your IDE. You can use the gradle eclipse or gradle idea commands to generate the project files for the Eclipse and IDEA IDE respectively.

See also

· The Integration testing Hadoop MapReduce applications using MiniYarnCluster recipe in this chapter

· For more information about using MRUnit, go to https://cwiki.apache.org/confluence/display/MRUNIT/MRUnit+Tutorial

Integration testing Hadoop MapReduce applications using MiniYarnCluster

While unit testing using MRUnit is very useful, there can be certain integration test scenarios that have to be tested in a cluster environment. MiniYARNCluster of Hadoop YARN is a cluster simulator that we can use to create a testing environment for such integration tests. In this recipe, we'll be using MiniYARNCluster to perform integration testing of the WordCountWithTools MapReduce application.

The source code for the test program used in this recipe is available in the chapter3\test\chapter3\minicluster\WordCountMiniClusterTest.java file in the Git repository.

Getting ready

We use Gradle as the build tool for our sample code base. If you haven't already done so, please install Gradle by following the instructions given in the introduction section of Chapter 1, Getting Started with Hadoop v2. Export the JAVA_HOME environmental variable pointing to your JDK installation.

How to do it...

The following steps show you how to perform integration testing of a MapReduce application using the MiniYarnCluster environment:

1. Within the setup method of your JUnit test, create an instance of MiniYarnCluster using MiniMRClientClusterFactory as follows. MiniMRClientCluster is a wrapper interface for MiniMRYarnCluster to provide support testing using Hadoop 1.x clusters.

2. public class WordCountMiniClusterTest {

3. private static MiniMRClientCluster mrCluster;

4. private class InternalClass {

5. }

6.

7. @BeforeClass

8. public static void setup() throws IOException {

9. // create the mini cluster to be used for the tests

10. mrCluster = MiniMRClientClusterFactory.create(InternalClass.class, 1,new Configuration());

11. }

}

12. Make sure to stop the cluster inside the setup method of your test:

13.@AfterClass

14. public static void cleanup() throws IOException {

15. // stopping the mini cluster

16. mrCluster.stop();

}

17. Within your test method, prepare a MapReduce computation using the configuration object of the MiniYARNCluster we just created. Submit the job and wait for its completion. Then test whether the job was successful.

18.@Test

19. public void testWordCountIntegration() throws Exception{

20.……

21. Job job = (new WordCountWithTools()).prepareJob(testInput,outDirString, mrCluster.getConfig());

22. // Make sure the job completes successfully

23. assertTrue(job.waitForCompletion(true));

24. validateCounters(job.getCounters(), 12, 367, 201, 201);

}

25. In this example, we will use the counters to validate the expected results of the MapReduce computation. You may also implement logic to compare the output data of the computation with the expected output of the computation. However, care must be taken to handle the possibility of having multiple output files due to the presence of multiple Reduce tasks.

26. private void validateCounters(Counters counters, long mapInputRecords,…) {

27. assertEquals("MapInputRecords", mapInputRecords, counters.findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue());

28. ………

29. }

30. Use the following Gradle command to execute only the WordCountMiniClusterTest JUnit test. This command executes any test class that matches the pattern **/WordCountMini*.class.

31.$ gradle -Dtest.single=WordCountMini test

32.:chapter3:compileJava UP-TO-DATE

33.:chapter3:processResources UP-TO-DATE

34.:chapter3:classes UP-TO-DATE

35.:chapter3:compileTestJava UP-TO-DATE

36.:chapter3:processTestResources UP-TO-DATE

37.:chapter3:testClasses UP-TO-DATE

38.:chapter3:test UP-TO-DATE

39.

40.BUILD SUCCESSFUL

41. You can also execute MiniYarnCluster-based unit tests in your IDE. You can use the gradle eclipse or gradle idea commands to generate the project files for the Eclipse and IDEA IDE respectively.

See also

· The Unit testing Hadoop MapReduce applications using MRUnit recipe in this chapter

· The Hadoop counters for reporting custom metrics recipe in Chapter 4, Developing Complex Hadoop MapReduce Applications

Adding a new DataNode

This recipe shows you how to add new nodes to an existing HDFS cluster without restarting the whole cluster, and how to force HDFS to rebalance after the addition of new nodes. Commercial Hadoop distributions typically provide a GUI-based approach to add and remove DataNodes.

Getting ready

1. Install Hadoop on the new node and replicate the configuration files of your existing Hadoop cluster. You can use rsync to copy the Hadoop configuration from another node; for example:

2. $ rsync -a <master_node_ip>:$HADOOP_HOME/etc/hadoop/ $HADOOP_HOME/etc/hadoop

3. Ensure that the master node of your Hadoop/HDFS cluster can perform password-less SSH to the new node. Password-less SSH setup is optional if you are not planning to use the bin/*.sh scripts from the master node to start/stop the cluster.

How to do it...

The following steps will show you how to add a new DataNode to an existing HDFS cluster:

1. Add the IP or the DNS of the new node to the $HADOOP_HOME/etc/hadoop/slaves file in the master node.

2. Start the DataNode on the newly added slave node by using the following command:

3. $ $HADOOP_HOME/sbin/hadoop-deamons.sh start datanode

Tip

You can also use the $HADOOP_HOME/sbin/start-dfs.sh script from the master node to start the DataNode daemons in the newly added nodes. This is helpful if you are adding more than one new DataNode to the cluster.

4. Check $HADOOP_HOME/logs/hadoop-*-datanode-*.log in the new slave node for any errors.

These steps apply to both adding a new node as well as rejoining a node that has crashed and restarted.

There's more...

Similarly, you can add a new node to the Hadoop YARN cluster as well:

1. Start the NodeManager in the new node using the following command:

2. > $HADOOP_HOME/sbin/yarn-deamons.sh start nodemanager

3. Check $HADOOP_HOME/logs/yarn-*-nodemanager-*.log in the new slave node for any errors.

Rebalancing HDFS

When you add new nodes, HDFS will not rebalance automatically. However, HDFS provides a rebalancer tool that can be invoked manually. This tool will balance the data blocks across clusters up to an optional threshold percentage. Rebalancing would be very helpful if you are having space issues in the other existing nodes.

1. Execute the following command:

2. > $HADOOP_HOME/sbin/start-balancer.sh –threshold 15

The (optional) –threshold parameter specifies the percentage of disk capacity leeway to consider when identifying a node as under- or over-utilized. An under-utilized DataNode is a node whose utilization is less than (average utilization-threshold). An over-utilized DataNode is a node whose utilization is greater than (average utilization + threshold). Smaller threshold values will achieve more evenly balanced nodes, but will take more time for the rebalancing. The default threshold value is 10 percent.

3. Rebalancing can be stopped by executing the sbin/stop-balancer.sh command.

4. A summary of the rebalancing is available in the $HADOOP_HOME/logs/hadoop-*-balancer*.out file.

See also

The Decommissioning DataNodes recipe in this chapter.

Decommissioning DataNodes

There can be multiple situations where you want to decommission one or more DataNodes from an HDFS cluster. This recipe shows how to gracefully decommission DataNodes without incurring data loss.

How to do it...

The following steps show you how to decommission DataNodes gracefully:

1. If your cluster doesn't have it, add an exclude file to the cluster. Create an empty file in the NameNode and point to it from the $HADOOP_HOME/etc/hadoop/hdfs-site.xml file by adding the following property. Restart the NameNode:

2. <property>

3. <name>dfs.hosts.exclude</name>

4. <value>FULL_PATH_TO_THE_EXCLUDE_FILE</value>

5. <description>Names a file that contains a list of hosts that are not permitted to connect to the namenode. The full pathname of the file must be specified. If the value is empty, no hosts are excluded.</description>

</property>

6. Add the hostnames of the nodes that are to be decommissioned to the exclude file.

7. Run the following command to reload the NameNode configuration:

8. $ hdfs dfsadmin –refreshNodes

This will start the decommissioning process. This process can take a significant amount of time as it requires replication of data blocks without overwhelming the other tasks of the cluster.

9. The progress of the decommissioning process is shown in the HDFS UI under the Decommissioning Nodes page. The progress can be monitored using the following command as well. Do not shut down the nodes until the decommissioning is complete.

10.$ hdfs dfsadmin -report

11......

12......

13.Name: myhost:50010

14.Decommission Status : Decommission in progress

15.Configured Capacity: ....

16......

17. You can remove the nodes from the exclude file and execute the hdfs dfsadmin –refreshNodes command when you want to add the nodes back in to the cluster.

18. The decommissioning process can be stopped by removing the node name from the exclude file and then executing the hdfs dfsadmin –refreshNodes command.

How it works...

When a node is in the decommissioning process, HDFS replicates the blocks in that node to the other nodes in the cluster. Decommissioning can be a slow process as HDFS purposely does it slowly to avoid overwhelming the cluster. Shutting down nodes without decommissioning may result in data loss.

After the decommissioning is complete, the nodes mentioned in the exclude file are not allowed to communicate with the NameNode.

See also

The Rebalancing HDFS section of the Adding a new DataNode recipe in this chapter.

Using multiple disks/volumes and limiting HDFS disk usage

Hadoop supports specifying multiple directories for the DataNode data directory. This feature allows us to utilize multiple disks/volumes to store data blocks in DataNodes. Hadoop tries to store equal amounts of data in each directory. It also supports limiting the amount of disk space used by HDFS.

How to do it...

The following steps will show you how to add multiple disk volumes:

1. Create HDFS data storage directories in each volume.

2. Locate the hdfs-site.xml configuration file. Provide a comma-separated list of directories corresponding to the data storage locations in each volume under the dfs.datanode.data.dir property as follows:

3. <property>

4. <name>dfs.datanode.data.dir</name>

5. <value>/u1/hadoop/data, /u2/hadoop/data</value>

</property>

6. In order to limit disk usage, add the following property to the hdfs-site.xml file to reserve space for non-DFS usage. The value specifies the number of bytes that HDFS cannot use per volume:

7. <property>

8. <name>dfs.datanode.du.reserved</name>

9. <value>6000000000</value>

10. <description>Reserved space in bytes per volume. Always leave this much space free for non dfs use.

11. </description>

</property>

Setting the HDFS block size

HDFS stores files across the cluster by breaking them down in to coarser-grained, fixed-size blocks. The default HDFS block size is 64 MB. Block size of a data product can affect the performance of the filesystem operations where larger block sizes would be more effective if you are storing and processing very large files. Block size of a data product can also affect the performance of MapReduce computations, as the default behavior of Hadoop is to create one Map task for each data block of the input files.

How to do it...

The following steps show you how to use the NameNode configuration file to set the HDFS block size:

1. Add or modify the following code in the $HADOOP_HOME/etc/hadoop/hdfs-site.xml file. The block size is provided using the number of bytes. This change would not change the block size of the files that are already in the HDFS. Only the files copied after the change will have the new block size.

2. <property>

3. <name>dfs.blocksize</name>

4. <value>134217728</value>

</property>

5. You can specify different HDFS block sizes for specific file paths as well. You can also specify the block size when uploading a file to HDFS from the command line as follows:

6. $ hdfs dfs \

7. -Ddfs.blocksize=134217728 \

8. -put data.in foo/test

There's more...

You can also specify the block size when creating files using the HDFS Java API as well, in the following manner:

public FSDataOutputStream create(Path f,boolean overwrite, int bufferSize, short replication,long blockSize)

You can use the fsck command to find the block size and block locations of a particular file path in the HDFS. You can find this information by browsing the filesystem from the HDFS monitoring console as well.

> $HADOOP_HOME/bin/hdfs fsck \

/user/foo/data.in \

-blocks -files -locations

......

/user/foo/data.in 215227246 bytes, 2 block(s): ....

0. blk_6981535920477261584_1059 len=134217728 repl=1 [hostname:50010]

1. blk_-8238102374790373371_1059 len=81009518 repl=1 [hostname:50010]

......

See also

The Setting the file replication factor recipe in this chapter.

Setting the file replication factor

HDFS stores files across the cluster by breaking them down into coarser-grained, fixed-size blocks. These coarser-grained data blocks are replicated to different DataNodes mainly for fault-tolerance purposes. Data block replication also has the ability to increase the data locality of the MapReduce computations and to increase the total data access bandwidth as well. Reducing the replication factor helps save storage space in HDFS.

The HDFS replication factor is a file-level property that can be set on a per-file basis. This recipe shows you how to change the default replication factor of an HDFS deployment affecting the new files that will be created afterwards, how to specify a custom replication factor at the time of file creation in HDFS, and how to change the replication factor of existing files in HDFS.

How to do it...

Follow these instructions to set the file replication factor using the NameNode configuration:

1. Add or modify the dfs.replication property in $HADOOP_HOME/etc/hadoop/hdfs-site.xml. This change will not change the replication factor of the files that are already in the HDFS. Only the files copied after the change will have the new replication factor. Please be aware that reducing the replication factor decreases the reliability of the stored files and may also cause a performance decrease when processing that data as well.

2. <property>

3. <name>dfs.replication</name>

4. <value>2</value>

</property>

5. Set the file replication factor when uploading the files. You can specify the replication factor when uploading the file from the command line as follows:

6. $ hdfs dfs \

7. -Ddfs.replication=1 \

8. -copyFromLocal \

9. non-critical-file.txt /user/foo

10. Change the file replication factor of the existing file paths. The setrep command can be used to change the replication factor of files or file paths that are already in the HDFS in the following manner:

11.$ hdfs dfs \

12. -setrep 2 non-critical-file.txt

13.

14.Replication 2 set: hdfs://myhost:9000/user/foo/non-critical-file.txt

How it works...

Have a look at the following command:

hdfs dfs -setrep [-R] <path>

The <path> parameter of the setrep command specifies the HDFS path where the replication factor has to be changed. The –R option recursively sets the replication factor for files and directories within a directory.

There's more...

The replication factor of a file is displayed when listing the files using the ls command:

$ hdfs fs -ls

Found 1 item

-rw-r--r-- 2 foo supergroup ... /user/foo/non-critical-file.txt

The replication factor of files is displayed in the HDFS monitoring the UI as well.

See also

The Setting the HDFS block size recipe in this chapter.

Using the HDFS Java API

The HDFS Java API can be used to interact with HDFS from any Java program. This API gives us the ability to utilize the data stored in HDFS from other Java programs as well as to process that data with other non-Hadoop computational frameworks. Occasionally, you may also come across a use case where you want to access HDFS directly from within a MapReduce application. However, if you are writing or modifying files in HDFS directly from a Map or Reduce task, please be aware that you are violating the side-effect-free nature of MapReduce, which might lead to data consistency issues based on your use case.

How to do it...

The following steps show you how to use the HDFS Java API to perform filesystem operations on an HDFS installation using a Java program:

1. The following sample program creates a new file in HDFS, writes some text in the newly created file, and reads the file back from HDFS:

2. import java.io.IOException;

3.

4. import org.apache.hadoop.conf.Configuration;

5. import org.apache.hadoop.fs.FSDataInputStream;

6. import org.apache.hadoop.fs.FSDataOutputStream;

7. import org.apache.hadoop.fs.FileSystem;

8. import org.apache.hadoop.fs.Path;

9.

10.public class HDFSJavaAPIDemo {

11. public static void main(String[] args) throws IOException {

12. Configuration conf = new Configuration();

13. FileSystem fs = FileSystem.get(conf);

14. System.out.println(fs.getUri());

15.

16. Path file = new Path("demo.txt");

17.

18. if (fs.exists(file)) {

19. System.out.println("File exists.");

20. } else {

21. // Writing to file

22. FSDataOutputStream outStream = fs.create(file);

23. outStream.writeUTF("Welcome to HDFS Java API!!!");

24. outStream.close();

25. }

26.

27. // Reading from file

28. FSDataInputStream inStream = fs.open(file);

29. String data = inStream.readUTF();

30. System.out.println(data);

31. inStream.close();

32.

33. fs.close();

}

34. Compile and package the preceding program by issuing the gradle build command in the chapter3 folder of the source repository. The hcb-c3-samples.jar file will be created in the build/libs folder.

35. You can execute the preceding sample using the following command. Running this sample using the hadoop script ensures that it uses the currently configured HDFS and the necessary dependencies from the Hadoop classpath.

36.$ hadoop jar \

37. hcb-c3-samples.jar \

38. chapter3.hdfs.javaapi.HDFSJavaAPIDemo

39.

40.hdfs://yourhost:9000

41.Welcome to HDFS Java API!!!

42. Use the ls command to list the newly created file, shown as follows:

43.$ hdfs dfs -ls

44.Found 1 items

45.-rw-r--r-- 3 foo supergroup 20 2012-04-27 16:57 /user/foo/demo.txt

How it works...

In order to interact with HDFS programmatically, we first need to obtain a handle to the currently configured filesystem. For this, we instantiate a Configuration object and obtain a FileSystem handle, which will point to the HDFS NameNode of the Hadoop environment where we run this program. Several alternative methods to configure a FileSystem object have been discussed in the Configuring the FileSystem object section in this chapter:

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);

The FileSystem.create(filePath) method creates a new file in the given path and provides us with an FSDataOutputStream object to the newly created file. FSDataOutputStream wraps java.io.DataOutputStream and allows the program to write primitive Java data types to the file. The FileSystem.Create() method overrides if the file exists. In this example, the file will be created relative to your HDFS home directory, which would result in a path similar to /user/<user_name>/demo.txt. Your HDFS home directory has to be created beforehand.

Path file = new Path("demo.txt");

FSDataOutputStream outStream = fs.create(file);

outStream.writeUTF("Welcome to HDFS Java API!!!");

outStream.close();

FileSystem.open(filepath) opens an FSDataInputStream to the given file. FSDataInputStream wraps java.io.DataInputStream and allows the program to read primitive Java data types from the file.

FSDataInputStream inStream = fs.open(file);

String data = inStream.readUTF();

System.out.println(data);

inStream.close();

There's more...

The HDFS Java API supports many more filesystem operations than we have used in the preceding sample. The full API documentation can be found at http://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html.

Configuring the FileSystem object

We can use the HDFS Java API from outside the Hadoop environment as well. When doing so, we have to explicitly configure the HDFS NameNode and the port. The following are a couple of ways to perform that configuration:

· You can load the configuration files to the configuration object before retrieving the FileSystem object as follows. Make sure to add all the Hadoop and dependency libraries to the classpath.

· Configuration conf = new Configuration();

· conf.addResource(new Path("/etc/hadoop/core-site.xml"));

· conf.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml"));

FileSystem fileSystem = FileSystem.get(conf);

· You can also specify the NameNode and the port as follows. Replace the NAMENODE_HOSTNAME and PORT with the hostname and the port of the NameNode of your HDFS installation.

· Configuration conf = new Configuration();

· conf.set("fs.defaultFS, "hdfs://NAMENODE_HOSTNAME:PORT");

FileSystem fileSystem = FileSystem.get(conf);

The HDFS filesystem API is an abstraction that supports several filesystems. If the preceding program does not find a valid HDFS configuration, it will point to the local filesystem instead of the HDFS. You can identify the current filesystem of the fileSystem object using the getUri() function as follows. It would result in hdfs://your_namenode:port if it's using a properly configured HDFS and file:/// if it is using the local filesystem.

fileSystem.getUri();

Retrieving the list of data blocks of a file

The getFileBlockLocations() function of the fileSystem object allows you to retrieve the list of data blocks of a file stored in HDFS, together with hostnames where the blocks are stored and the block offsets. This information would be very useful if you are planning on doing any local operations on the file data using a framework other than Hadoop MapReduce.

FileStatus fileStatus = fs.getFileStatus(file);

BlockLocation[] blocks = fs.getFileBlockLocations(

fileStatus, 0, fileStatus.getLen());