Using Apache Hadoop YARN Distributed-Shell - Apache Hadoop YARN: Moving beyond MapReduce and Batch Processing with Apache Hadoop 2 (2014)

Apache Hadoop YARN: Moving beyond MapReduce and Batch Processing with Apache Hadoop 2 (2014)

11. Using Apache Hadoop YARN Distributed-Shell

The Hadoop YARN project includes the Distributed-Shell application, which is an example of a non-MapReduce application built on top of YARN. Distributed-Shell is a simple mechanism for running shell commands and scripts in containers on multiple nodes in a Hadoop cluster. There are multiple existing implementations of a distributed shell that administrators typically use to manage a cluster of machines, and this application is a way to demonstrate how such a utility can be implemented on top of YARN.

More than just providing a parallel execution application, Distributed-Shell can be used as a starting point for exploring and building Hadoop YARN applications. This chapter is intended to be a guide to how one can use the Distributed-Shell application and, more than that, play with it so as to understand more about how a YARN application can be written as well as how it interacts with YARN for its execution.

Using the YARN Distributed-Shell

For the purpose of examples in the remainder of this chapter, we assume the following installation path of the Distributed-Shell application:

$ export YARN_DS=$YARN_HOME/share/hadoop/yarn

$YARN_HOME should point to the installation directory of YARN. In addition, the Distributed-Shell examples that follow have a version tag defined using the environment variable “$YARN_VERSION.” Change this value to match your installation.

$ export YARN_VERSION=2.2.0

Distributed-Shell exposes various options that can be found by running the following:

$ yarn org.apache.hadoop.yarn.applications.distributedshell.Client –jar \
$YARN_DS/hadoop-yarn-applications-distributedshell-$YARN_VERSION.jar -help

The output of this command follows; we will explore some of these options in the examples illustrated in this chapter.

usage: Client
-appname <arg> Application Name. Default value -
DistributedShell
-container_memory <arg> Amount of memory in MB to be requested to run
the shell command
-debug Dump out debug information
-help Print usage
-jar <arg> Jar file containing the application master
-log_properties <arg> log4j.properties file
-master_memory <arg> Amount of memory in MB to be requested to run
the application master
-num_containers <arg> No. of containers on which the shell command
needs to be executed
-priority <arg> Application Priority. Default 0
-queue <arg> RM Queue in which this application is to be
submitted
-shell_args <arg> Command line args for the shell script
-shell_cmd_priority <arg> Priority for the shell command containers
-shell_command <arg> Shell command to be executed by the
Application Master
-shell_env <arg> Environment for shell script. Specified as
env_key=env_val pairs
-shell_script <arg> Location of the shell script to be executed
-timeout <arg> Application timeout in milliseconds

A Simple Example

The simplest use-case for the Distributed-Shell application is to run an arbitrary shell command in a container. We will demonstrate the use of the uptime command as an example. This command can be run on the cluster using Distributed-Shell as follows:

$ yarn org.apache.hadoop.yarn.applications.distributedshell.Client –jar \
$YARN_DS/hadoop-yarn-applications-distributedshell-$YARN_VERSION.jar \
-shell_command uptime

By default, Distributed-Shell spawns only one instance of a given shell command. When this command is run, one can see log messages on the screen. If the shell command succeeds, the following should appear at the end of the output:

13/10/16 19:34:23 INFO distributedshell.Client: Application completed successfully

If the shell command did not work for whatever reason, the following message will be displayed:

13/10/16 19:36:15 ERROR distributedshell.Client: Application failed to complete successfully

The next step is to examine the output for the application. Distributed-Shell redirects the output of the individual shell commands run on the cluster nodes into the log files, which are found either on the individual nodes or aggregated on to HDFS depending on whether log aggregation is enabled.

Assuming log aggregation is not enabled, the results for each instance of the command are listed by container under an application-id directory. For example, if the contents of the application-id directory are listed, two containers’ directories can be seen:

$ ls $YARN_HOME/logs/userlogs/application_1381961205352_0005
container_1381961205352_0005_01_000001 container_1381961205352_0005_01_000002

Recall that the first container (.._000001) is the ApplicationMaster (the head process). The second container (.._000002) is where the actual command output resides. Within each directory, there are two files, stdout and stderr. For example,

$ ls $YARN_HOME/logs/userlogs/application_1381961205352_0005/container_1381961205352_0005_01_000002/
stderr stdout

If we print the contents of the stdout file, we find the expected result for the uptime command.

19:44:30 up 1 day, 6:53, 6 users, load average: 0.00, 0.00, 0.00

Similarly, one can look for the following output if log aggregation is enabled:

$ yarn logs -applicationId application_1388537987294_0001

This command will show the output for all containers in a single output stream.

Using More Containers

Distributed-Shell can run commands to be executed on any number of containers by way of the -num_containers argument. For example, to see on which nodes the Distributed-Shell command was run, we can use the following:

$ yarn org.apache.hadoop.yarn.applications.distributedshell.Client –jar \
$YARN_DS/hadoop-yarn-applications-distributedshell-$YARN_VERSION.jar \
-shell_command hostname -num_containers 4

If we now examine the results for this job, there will be five containers’ log directories, each containing its own stdout and stderr files.

$ ls $YARN_HOME/logs/userlogs/application_1381961205352_0006/
container_1381961205352_0006_01_000001
container_1381961205352_0006_01_000002
container_1381961205352_0006_01_000003
container_1381961205352_0006_01_000004
container_1381961205352_0006_01_000005

Containers with IDs ranging from 2 to 5 will have in their stdout file the hostname of the machine where the job was run.

Distributed-Shell Examples with Shell Arguments

Arguments can be added to the shell command using the -shell_args option. For example, to do a ls –l in the directory from where the shell command was run, we can use the following commands:

$ yarn org.apache.hadoop.yarn.applications.distributedshell.Client –jar \
$YARN_DS/hadoop-yarn-applications-distributedshell-$YARN_VERSION.jar \
-shell_command ls -shell_args -l

The resulting output is as follows:

total 16
-rw-r--r-- 1 hdfs hadoop 7 Oct 16 20:17 container_tokens
-rwx------ 1 hdfs hadoop 382 Oct 16 20:17 default_container_executor.sh
-rwx------ 1 hdfs hadoop 1303 Oct 16 20:17 launch_container.sh
drwx--x--- 2 hdfs hadoop 4096 Oct 16 20:17 tmp

As you can see, the resulting files are new and not located anywhere in our hdfs or local file system. If we explore a little more using a Distributed-Shell pwd command, we find that these files are in directories of the kind

/hdfs/tmp/usercache/doug/appcache/application_1381961205352_0008/container_1381961205352_0008_01_000002

on the node that executed the shell command.

However, once the application finishes, if we log into the node and search for these files, they may not exist. These transient files are used by YARN to run the Distributed-Shell application and are removed once the application finishes. You can preserve these files for a specific interval by adding the following to the yarn-site.xml configuration file and restarting YARN. You can choose the delay in seconds to suit your needs—these files will be retained on the individual nodes only for the duration of the specified delay.

<property>
<name>yarn.nodemanager.delete.debug-delay-sec</name>
<value>10000000</value>
</property>

These files—in particular, launch_container.sh—are important when debugging YARN applications. Let’s use the Distributed-Shell itself to dig into what this file is about. We can examine the launch_container.sh file with the following command:

$ yarn org.apache.hadoop.yarn.applications.distributedshell.Client –jar \
$YARN_DS/hadoop-yarn-applications-distributedshell-$YARN_VERSION.jar \
-shell_command cat -shell_args launch_container.sh

This command outputs the launch_container.sh file that is created and run by YARN before executing the user-supplied shell utility. The contents of the file are shown in Listing 11.1. The file basically exports some important YARN variables and then, at the end, “execs” the command directly and sends the output to the stdout and stderr files mentioned earlier.

Listing 11.1 Distributed-Shell launch_container.sh file


#!/bin/bash

export NM_HTTP_PORT="8042"
export LOCAL_DIRS="/hdfs/tmp/usercache/doug/appcache/application_1381856870533_0040"
export HADOOP_COMMON_HOME="/opt/yarn/hadoop-$YARN_VERSION"
export JAVA_HOME="/usr/lib/jvm/java-1.6.0-openjdk.x86_64"
export HADOOP_YARN_HOME="/opt/yarn/hadoop-$YARN_VERSION"
export HADOOP_TOKEN_FILE_LOCATION="/hdfs/tmp/usercache/doug/appcache/application_1381856870533_0040/container_1381856870533_0040_01_000002/container_tokens"
export NM_HOST="n2"
export JVM_PID="$$"
export USER="doug"
export HADOOP_HDFS_HOME="/opt/yarn/hadoop-$YARN_VERSION"
export PWD="/hdfs/tmp/usercache/doug/appcache/application_1381856870533_0040/container_1381856870533_0040_01_000002"
export CONTAINER_ID="container_1381856870533_0040_01_000002"
export NM_PORT="45176"
export HOME="/home/"
export LOGNAME="doug"
export HADOOP_CONF_DIR="/opt/yarn/hadoop-$YARN_VERSION/etc/hadoop"
export MALLOC_ARENA_MAX="4"
export LOG_DIRS="/opt/yarn/hadoop-$YARN_VERSION/logs/userlogs/application_1381856870533_0040/container_1381856870533_0040_01_000002"
exec /bin/bash -c "cat launch_container.sh 1>/opt/yarn/hadoop-$YARN_VERSION/logs/userlogs/application_1381856870533_0040/container_1381856870533_0040_01_000002/stdout 2>/opt/yarn/hadoop-$YARN_VERSION/logs/userlogs/application_1381856870533_0040/container_1381856870533_0040_01_000002/stderr "


There are more options for the Distributed-Shell that you can play with. However, as we mentioned earlier, some other existing utilities (such as the pdsh utility) provide easy and feature-rich tools for running simple commands and scripts across the cluster. The real value of the Distributed-Shell application is its showcasing of applications that can run within the Hadoop YARN infrastructure.

We will now delve a little more into the internal details of how Distributed-Shell itself works and how you can modify it, enhance it, or even use it as a scaffolding to write your own YARN applications.

Internals of the Distributed-Shell

The Distributed-Shell is the “hello-world.c” for Hadoop 2. That is, it demonstrates the basic functionality of a YARN application. Once its internal workings are understood, it can be used as a starting point for writing new YARN applications. The source code for the Distributed-Shell can be found in $YARN_HOME/share/hadoop/yarn/sources. These source files can be extracted by running

jar –xf hadoop-yarn-applications-distributedshell-$YARN_VERSION-sources.jar

Three main classes make up (and so can be changed to use it as a template for your own YARN application) the main package of Distributed-Shell—that is, the org.apache.hadoop.yarn.applications.distributedshell package. These classes are:

1. Client

2. ApplicationMaster

3. DSConstants

In addition to making changes to the existing Distributed-Shell application, you may want to write more complex logic than just invoking shell commands. A large share of the code can be reused with minimal changes, allowing for quick prototyping of bare-bones YARN applications. To help you in getting started with understanding the internals as well as to provide templates that you can use with subsequent modification, selective fragments of code will be highlighted here. The goals of this approach are twofold: to explain the workings of the application and to allow simple modification or duplication of existing code so that you can quickly get a prototype application running.

Application Constants

The DSConstants class offers a simple way to keep track of information as environment keys that will be used in containers later. In its native form, it is designed to be a shell script run as part of Distributed-Shell. To reuse the code in Listing 11.2, simply make copies of the three string variables for each file (local resource) you plan on using in your containers.

Listing 11.2 Metadata for the DistributedShellScript public class DSConstants


public class DSConstants {

/**
* Environment key name pointing to the shell script's location
*/
public static final String DISTRIBUTEDSHELLSCRIPTLOCATION = "DISTRIBUTEDSHELLSCRIPTLOCATION";
/**
* Environment key name denoting the file timestamp for the shell script.
* Used to validate the local resource.
*/
public static final String DISTRIBUTEDSHELLSCRIPTTIMESTAMP = "DISTRIBUTEDSHELLSCRIPTTIMESTAMP";
/**
* Environment key name denoting the file content length for the shell script.
* Used to validate the local resource.
*/
public static final String DISTRIBUTEDSHELLSCRIPTLEN = "DISTRIBUTEDSHELLSCRIPTLEN";
}


Strictly speaking, these environment variables are just a simple way of passing static information like file timestamps and file lengths to the ApplicationMaster so that it can, in turn, set the same information as part of launching containers. A more complex application may have more static data as well as dynamic information that needs to be passed. Using environment variables is one way of achieving this outcome. Other possibilities are to use configuration files that are distributed as local resources and shared services (like HDFS) that the client and ApplicationMaster can access.

Client

The Client class is fundamentally responsible for launching the ApplicationMaster, which will then schedule and run the shell commands in each container. The Client class performs three major tasks to get user-supplied executables to run in their own containers:

1. Launch the Client CLI (command-line interface)

2. Manage additional local resources

3. Set up the ApplicationMaster environment

The Client CLI obtains the application jar file, the location of a user shell script, a custom log-configuration file, and other information from the user. Listing 11.3 checks whether a jar file was passed to the CLI and adds its results to a variable.

Listing 11.3 Adding the appMasterJar to the CLI


if (!cliParser.hasOption("jar")) {
throw new IllegalArgumentException("No jar file specified for application master");
}
appMasterJar = cliParser.getOptionValue("jar");


The CLI parsing code can be modified if necessary to add more libraries for execution as containers. By duplicating how the ApplicationMaster jar file is added from the command-line parser, you can quickly add new configuration features to the Client class.

After checking for valid input from the CLI, the client needs to connect to the ResourceManager for application submission. At this point, the client can request additional cluster information like the maximum container size, together with a new ApplicationId. In the Distributed-Shell application, the YarnClient library is used to create an instance of YarnClientApplication that encompasses this information.

The client then creates an ApplicationSubmissionContext that specifies the application details such as ApplicationId, an application name, the priority assigned to the application, and the submission queue’s name. The ApplicationSubmissionContext record also requires a ContainerLaunchContext that describes the container under which the ApplicationMaster itself is launched. Inside the ContainerLaunchContext, we define the resource requirements for the ApplicationMaster container; the local resources (jar files, configuration files), the execution environment for the container, and the commands that must be executed to start the ApplicationMaster container.

Because the ApplicationMaster does not make use of the libraries required by the containers that run the user scripts, there is no need to make them available to the ApplicationMaster container itself; they need only be uploaded to HDFS. An example is the way Distributed-Shell makes a shell script available to the actual containers. Listing 11.4 is a simple snippet that shows how to upload your container libraries into HDFS.

Listing 11.4 Code to upload your container libraries to HDFS


String hdfsShellScriptLocation = "";
long hdfsShellScriptLen = 0;
long hdfsShellScriptTimestamp = 0;
if (!shellScriptPath.isEmpty()) {
Path shellSrc = new Path(shellScriptPath);
String shellPathSuffix =
appName + "/" + appId.getId() + "/"
+ (Shell.WINDOWS ? windowBatPath : linuxShellPath);
Path shellDst =
new Path(fs.getHomeDirectory(), shellPathSuffix);
// Copy the file to DFS
fs.copyFromLocalFile(false, true, shellSrc, shellDst);
// Record its metadata
hdfsShellScriptLocation = shellDst.toUri().toString();
FileStatus shellFileStatus = fs.getFileStatus(shellDst);
hdfsShellScriptLen = shellFileStatus.getLen();
hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
}


Although the ApplicationMaster does not use the libraries itself, it will be launching the final containers that use these libraries. Because of this, the ApplicationMaster needs to know the metadata—that is, where the libraries are on HDFS, the last modification time of the file, and the file content length. After the Client uploads the libraries, this metadata can be collected and made available as environmental properties in the ApplicationMaster’s container (or via separate configuration files, as we hinted earlier). The DSConstants class is used to manage these environment properties. Listing 11.5 shows how the shell script’s metadata is stored into the container environment.

Listing 11.5 Code for storing the shell script’s metadata into the container environment


Map<String, String> env = new HashMap<String, String>();
// put location of shell script into env
// using the env info, the application master will create the correct local resource for the
// eventual containers that will be launched to execute the shell scripts
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP,
Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN,
Long.toString(hdfsShellScriptLen));


The resources needed by the AM container itself can be added to the local resources in the ApplicationSubmissionContext as shown in Listing 11.6.

Listing 11.6 Adding local resources to the AM container


if (!shellCommand.isEmpty()) {
addToLocalResources(fs, null, shellCommandPath, appId.getId(),
localResources, shellCommand);
}

private void addToLocalResources(FileSystem fs, String fileSrcPath,
String fileDstPath, int appId, Map<String, LocalResource> localResources,
String resources) throws IOException {
[ . . . . ]
FileStatus scFileStatus = fs.getFileStatus(dst);
LocalResource scRsrc =
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromURI(dst.toUri()),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
scFileStatus.getLen(), scFileStatus.getModificationTime());
localResources.put(fileDstPath, scRsrc);
}


Finally, the ApplicationSubmissionContext is submitted to the ResourceManager and the client then monitors the application by requesting a periodic ApplicationReport from the ResourceManager (Listing 11.7).

Listing 11.7 Monitoring the progress of the application


private boolean monitorApplication(ApplicationId appId)
throws YarnException, IOException {
while (true) {

// Check app status every 1 second
Thread.sleep(1000);

// Get application report for the appId we are interested in
ApplicationReport report = yarnClient.getApplicationReport(appId);

YarnApplicationState state = report.getYarnApplicationState();
if (YarnApplicationState.FINISHED == state) {
LOG.info("Application has completed successfully. Breaking monitoring loop");
return true;
}
[ . . . ]
}
}


Mimicking existing distributed-shell utilities, if the application is taking an excessive amount of time to execute, the client kills the application.


Note

If you want to create a long-running application that is not killed via the built-in distributed shell timeout, comment out the code in Listing 11.8 in the client.


Listing 11.8 Application Timeout code block


if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
LOG.info("Reached client specified timeout for application. Killing application");
forceKillApplication(appId);
return false;
}


ApplicationMaster

The ApplicationMaster begins by registering itself with the ResourceManager (Listing 11.9), and then sends a heartbeat to the ResourceManager at regular intervals to indicate that it is up and alive. When you are writing your own application, it is important that the ApplicationMaster register itself immediately so that the ResourceManager does not think it has failed to start and, therefore, kill the AM container.

Listing 11.9 Registration with the ResourceManager


AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();

RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);


The ApplicationMaster’s init method initializes the DistributedShellScript variables by accessing the parameters set in its own environment by the Client class. These parameters are then later used in creating LocalResources for the final containers that are launched by the ApplicationMaster. The code in Listing 11.10 initializes metadata for LocalResources used in the final containers.

Listing 11.10 Initializing metadata for LocalResources used in the final containers


if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);

if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
shellScriptPathTimestamp = Long.valueOf(envs
.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
}
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
shellScriptPathLen = Long.valueOf(envs
.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
}
}


To perform the requisite processing, the ApplicationMaster must request containers from the ResourceManager. This request is made using a ResourceRequest with specific entries for each resource—the node location, memory, or CPU needs of the container. Most of this information is user input that already exists as part of the Distributed-Shell Client CLI parameters that eventually get passed to the ApplicationMaster. The ResourceManager responds with a set of newly allocated containers, as well as current state of freely available resources.

For each allocated container, the ApplicationMaster then sets up the necessary launch context via ContainerLaunchContext to specify the allocated container ID, local resources required by the executable, the environment to be set up for the executable, commands to execute, and so on.

Inside the run() method of the ApplicationMaster, the ContainerLaunchContext sets up its Environment and LocalResources for the final containers. One can modify the code used for the DistributedShellScript and adjust it so that it points to the libraries’ metadata that was populated from the DSConstants environmental variables. You can also easily add new environment parameters (e.g., the ApplicationMaster hostname) by creating an instance of HashMap<String, String>, populating it with key-value pairs, and finally callingsetEnvironment(HashMapObj), thereby adding the parameters to the final containers’ environment. If you are using jar, zip, or tar files that require expansion inside the container, changing the LocalResourceType.FILE to ARCHIVE will automatically decompress the archive into a folder named after the file. The code in Listing 11.11 adds environment and file data to the final container.

Listing 11.11 Adding environment and file data to the final container resources


// Set the environment
ctx.setEnvironment(shellEnv);

// Set the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();

// The container for the eventual shell commands needs its own local
// resources too.
// In this scenario, if a shell script is specified, we need to have it
// copied and made available to the container.
if (!shellScriptPath.isEmpty()) {
LocalResource shellRsrc = Records.newRecord(LocalResource.class);
shellRsrc.setType(LocalResourceType.FILE);
shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
try {
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
shellScriptPath)));
} catch (URISyntaxException e) {
LOG.error("Error when trying to use shell script path specified"
+ " in env, path=" + shellScriptPath);
e.printStackTrace();

// A failure scenario on bad input such as invalid shell script path
// We know we cannot continue launching the container
// so we should release it.
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
return;
}
shellRsrc.setTimestamp(shellScriptPathTimestamp);
shellRsrc.setSize(shellScriptPathLen);
localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
ExecShellStringPath, shellRsrc);
shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
}
ctx.setLocalResources(localResources);


The last potential area for modification of the ApplicationMaster code lies in the container launch commands. It is here that setting up particular class libraries, running code, and performing other actions can be taken inside the containers. Anything that can be run from the Linux or Windows command line can be run as a command in the containers. If, for example, you wanted to run a Runnable.jar that you have uploaded as a library up to this point, you can do so with the following code:

vargs.add(Environment.JAVA_HOME.$() + "/bin/java -jar Runnable.jar");


Note

Multiple commands can be stacked for execution when the container starts. To run multiple commands in sequence, run the commands by making use of semicolons. Other shell syntax will work as well. For example, this multistep command can be set up to run at container launch:

'cd UnTarDirectory; mkdir testDir; mv cmd1.sh testDir/; cd testDir; sh cmd1.sh'


Listing 11.12 adds the actual application commands to the launch containers.

Listing 11.12 Adding commands to the launch containers


// Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);

// Set executable command
vargs.add(shellCommand);
// Set shell script path
if (!shellScriptPath.isEmpty()) {
vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
: ExecShellStringPath);
}

// Set args for the shell command if any
vargs.add(shellArgs);
// Add log redirect params
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");

// Get final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
ctx.setCommands(commands);


The ApplicationMaster finally talks to the appropriate NodeManager to launch the container (Listing 11.13). It uses the NMClient library to do so.

Listing 11.13 Launching the container


containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
[ . . . ]

containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);


The ApplicationMaster can monitor the launched container by either querying the ResourceManager using ApplicationMasterProtocol.allocate() API to get updates on completed containers or using the ContainerManagementProtocol.getContainerStatus() API to query for the status of the allocated container’s ContainerId. Most applications, including Distributed-Shell, do the former.

After its work is completed, similar to application registration, the AM container sends a FinishApplicationMasterRequest to the ResourceManager to inform it about the container’s status.

Final Containers

The final containers launched by the ApplicationMaster can be anything that the Container start-up commands can execute—for example, Python programs, Perl, Java, C++, shell commands, and more. Often, the real challenge is not launching the containers to execute the code, but rather coordinating the distributed nature of the containers. Numerous applications can be made to operate in a distributed parallel fashion with minimal changes. This aspect promotes reuse of existing codebases to run as YARN applications. In many cases, only the parallelism logic needs to be created and added to the main application code.

Wrap-up

The Distributed-Shell represents one of the first YARN application frameworks that does not run as a MapReduce application. Although it might not add a lot of real-world utility to existing parallel-shell utilities, it serves as an excellent starting point for building new Hadoop YARN applications. Using the code examples provided in this chapter, you can easily modify the Distributed-Shell application and explore writing your own YARN applications. In addition, using Distributed-Shell to probe the YARN execution process itself helps provide insight into how YARN runs distributed applications.