Data Collection and Data Analysis with AWS - Programming Elastic MapReduce (2014)

Programming Elastic MapReduce (2014)

Chapter 2. Data Collection and Data Analysis with AWS

Now that we’ve covered the basics of AWS and Amazon EMR, you can get to work on using Amazon’s tools in the cloud. To get started, you’ll create some sample data to parse your first Amazon EMR job. A number of AWS tools and techniques will be required as part of this exercise to move the data to a location that Amazon EMR can access and work on. This should give you a solid background on what is available, and how to begin thinking about your data and overcoming challenges of moving your data into AWS.

Amazon EMR is built with many’ of the core components and frameworks of Apache Hadoop. Apache Hadoop allows organizations to build data-intensive distributed applications across a cluster of low-cost hardware. Amazon EMR simply takes this technology and moves it to the Amazon cloud to run at web scale on Amazon’s AWS hardware.

The key to all of this is the MapReduce framework. MapReduce is a powerful framework used to break down large data sets into smaller sets that can be processed in Amazon EMR across multiple EC2 instances that compose a cluster. To demonstrate the power of this concept, in this chapter you’ll create an Amazon EMR Cluster, also known as a Job Flow in Java. The Job Flow will determine message frequency for the test sample data set. Of course, as with learning anything new, you are bound to make mistakes and errors in the development of an Amazon EMR Job Flow. Toward the end of the chapter, we will intentionally introduce a number of errors into the Job Flow so you can step through the process of exploring Amazon EMR logs and tools. This process can help you find errors and resolve problems in your own Amazon EMR application.

Log Analysis Application

Now let’s focus on building a number of the components of the log analysis application described in Chapter 1. You will create your data set in the cloud on a Linux system using Amazon’s EC2 service. Then the data will be moved through S3 to be processed by an application running on the Amazon EMR cluster, and in the end the processed result set will show the error messages and their frequency. Figure 2-1 shows the workflow of the system components that you’ll be building.

Application workflow covered in this chapter

Figure 2-1. Application workflow covered in this chapter

Log Messages as a Data Set for Analytics

Since the growth of the Internet, the amount of electronic data that companies retain has exploded. With the advent of tools like Amazon EMR, it is only recently that companies have had tools to mine and use their vast data repositories. Companies are using their data sets to gain a competitive advantage over their rivals by mining their data sets to learn what matters to their customer base the most. The growth in this field has put data scientists and individuals with data analytics skills in high demand.

The struggle many have faced is how to get started learning with these tools and access a data set of sufficient size. This is why we have chosen to use computer log messages to illustrate many of the points in the first Job Flow example in this chapter. Computers are logging information on a regular basis, and the logfiles are a ready and available data source that most developers understand well from troubleshooting issues in their daily jobs. Computer logfiles are a great data source to start learning how to use data analysis tools like Amazon EMR. Take a look at your own computer—on a Linux or Macintosh system, many of the logfiles can be found in /var/log. Figure 2-2 shows an example of the format and information of some of the log messages that you can find.

Typical computer log messages

Figure 2-2. Typical computer log messages

If this data set does not work well for you and your industry, Amazon hosts many public data sets that you could use instead. The data science website Kaggle also hosts a number of data science competitions that may be another useful resource for data sets as you are learning about MapReduce.

Understanding MapReduce

Before getting too far into an example, let’s explore the basics of MapReduce. MapReduce is the core of Hadoop, and hence the same is true for Amazon EMR. MapReduce is the programming model that allows Amazon EMR to take massive amounts of data, break it up into small chunks across a configured number of virtual EC2 instances, analyze the data in parallel across the instances using map and reduce procedures that we write, and derive conclusions from analyses on very large data sets.

The term MapReduce refers to the separate procedures written to build a MapReduce application that perform analysis on the data. The map procedure takes a chunk of data as input and filters and sorts the data down to a set of key/value pairs that will be processed by the reduce procedure. The reduce procedure performs summary procedures of grouping, sorting, or counting of the key/value pairs, and allows Amazon EMR to process and analyze very large data sets across multiple EC2 instances that compose an Amazon EMR cluster.

Let’s take a look at how MapReduce works using a sample log entry as an example. Let’s say you would like to know how many log messages are created every second. This can be useful in numerous data analysis problems, from determining load distribution, pinpointing network hotspots, or gathering performance data, to finding machines that may be under attack. In general, these sorts of issues fall into a category commonly referred to as frequency analysis. Looking at the example log record, the time in the log messages is the first data element and notes when the message occurred down to the second:

Apr 15 23:27:14 hostname.local ./generate-log.sh[17580]: INFO: Login ...

Apr 15 23:27:14 hostname.local ./generate-log.sh[17580]: INFO: Login ...

Apr 15 23:27:15 hostname.local ./generate-log.sh[17580]: WARNING: Login failed...

Apr 15 23:27:16 hostname.local ./generate-log.sh[17580]: INFO: Login ...

We can write a map procedure that parses out the date and time and treats this data element as a key. We can then use the key selected, which is the date and time in the log data, to sort and group the log entries that have occurred at that timestamp. The pseudocode for the map procedure can be represented as follows:

map( "Log Record" )

Parse Date and Time

Emit Date and Time as the key with a value of 1

The map procedure would emit a set of key/value pairs like the following items:

(Apr 15 23:27:14, 1)

(Apr 15 23:27:14, 1)

(Apr 15 23:27:15, 1)

(Apr 15 23:27:16, 1)

...

This simple map procedure parses a log line, emits the date and time as the key, and uses the numeric value of one as the value in each pair. The data set generated by the map procedure is grouped by the framework to combine duplicate keys and create an array of values for each key. The following is the final intermediate data set that is sent to the reduce procedure:

(Apr 15 23:27:14, (1, 1))

(Apr 15 23:27:15, 1)

(Apr 15 23:27:16, 1)

...

The reduce procedure determines a count of each key—date and time—by iterating through the array of values and coming up with the total number of the log lines that occurred each second. The pseudocode for the reduce procedure can be represented something like the following:

reduce( Key, Values )

sum = 0

for each Value:

sum = sum + value

emit (Key, sum)

The reduce procedure will generate a single line with the key and sum for each key as follows:

Apr 15 23:27:14 2

Apr 15 23:27:15 1

Apr 15 23:27:16 1

...

The final result from the reduce procedure has gone through each of the date and time keys from the map procedure and arrived at counts for the number of log lines that occurred on each second in the sample logfile.

Figure 2-3 details the flow of data through the map and reduce phases of a Job Flow working on the log data.

Data Flow through the map and reduce framework components

Figure 2-3. Data Flow through the map and reduce framework components

Collection Stage

To utilize the power of Amazon EMR, we need a data set to perform analysis on. AWS services as well as Amazon EMR utilize Amazon S3 for persistent storage and data retrieval. Let’s get a data set loaded into S3 so you can start your analysis.

The collection stage is the first step in any data analysis problem. Your first challenge as a data scientist is to get access to raw data from the systems that contain it and pull it into a location where it can actually be analyzed. In many organizations, data will come in flat files, databases, and binary formats stored in many locations. Recalling the log analysis example described in Chapter 1, we know there is a wide diversity of log sources and log formats in an enterprise organization:

§ Servers (Unix, Windows, etc.)

§ Firewalls

§ Intrusion detection systems (IDS)

§ Printers

§ Proxy servers

§ Web application firewalls (WAF)

§ Custom-built software

In the traditional setting, the data will be fed into the data analysis system with raw data from applications, devices, and systems on an internal corporate network. In today’s environments, it is conceivable that the data to be processed will be distributed on internal networks, extranets, and even applications and sources running in a cloud environment already. These systems are all good and realistic sources of data for data analysis problems in an organization.

In this section, you’ll provision and start an EC2 instance to generate some sample raw log data. In order to keep the data collection simple, we’ll generate a syslog format log file on the EC2 instance. These same utilities can be used to load data from the various source systems in a typical organization into an S3 bucket for analysis.

Simulating Syslog Data

The simplest way to get started is to generate a set of log data from the command line utilizing a Bash shell script. The data will have relatively regular frequency because the Bash script is just generating log data in a loop and the data itself is not user- or event-driven. We’ll look at a data set generated from system- and user-driven data in Chapter 3 after the basic Amazon EMR analysis concepts are covered here.

Let’s create and start an Amazon Linux EC2 instance on which to run a Bash script. From the Amazon AWS Management Console, choose the EC2 service to start the process of creating a running Linux instance in AWS. Figure 2-4 shows the EC2 Services Management Console.

Amazon EC2 Services Management Console

Figure 2-4. Amazon EC2 Services Management Console

From this page, choose Launch Instance to start the process of creating a new EC2 instance. You have a large number of types of EC2 instances to choose from, and many of them will sound similar to systems and setups running in a traditional data center. These choices are broken up based on the operating system installed, the platform type of 32-bit or 64-bit, and the amount of memory and CPU that will be allocated to the new EC2 instance. The various memory and CPU allocation options sound a lot like fast food restaurant meal size choices of micro, small, medium, large, extra large, double extra large, and so on. To learn more about EC2 instance types and what size may make sense for your application, see more at Amazon’s EC2 website, where Amazon describes the sizing options and pricing available.

Speed and resource constraints are not important considerations for generating the simple syslog data set from a Bash script. We will be creating a new EC2 instance that uses the Amazon Linux AMI. This image type is shown in the EC2 creation wizard in Figure 2-5. After choosing the operating system we will create the smallest option, the micro instance. This EC2 machine size is sufficient to get started generating log data.

Amazon Linux AMI EC2 instance creation

Figure 2-5. Amazon Linux AMI EC2 instance creation

After you’ve gone through Amazon’s instance creation wizard, the new EC2 instance is created and running in the AWS cloud. The running instance will appear in the Amazon EC2 Management Console as shown in Figure 2-6. You can now establish a connection to the running Linux instance through a variety of tools based on the operating system chosen. On running Linux instances, you can establish a connection directly through a web browser by choosing the Connect option available on the right-click menu after you’ve selected the running EC2 instance.

The created Amazon EC2 micro instance in the EC2 Console

Figure 2-6. The created Amazon EC2 micro instance in the EC2 Console

WARNING

Amazon uses key pairs as a way of accessing EC2 instances and a number of other AWS services. The key pair is part of the SSL encryption mechanism used for communication between you and your cloud resources. It is critical that you keep the private key in a secure place because anyone who has the private key can access your cloud resources. It is also important to know that Amazon keeps a copy of your public key only. If you lose your private key, you have no way of retrieving it again later from Amazon.

Generating Logs with Bash

Now that an EC2 Linux image is up and running in AWS, let’s create some log messages. The following simple Bash script will generate output similar to syslog-formatted messages found on a variety of other systems throughout an organization:

#!/bin/bash

log_message()

{

Current_Date=`date +'%b %d %H:%M:%S'`

Host=`hostname`

echo "$Current_Date $Host $0[$$]: $1" >> $2 1

}

# Generate a log events

for (( i = 1; i <= $1 ; i++ )) 2

do

log_message "INFO: Login successful for user Alice" $2 3

log_message "INFO: Login successful for user Bob" $2

log_message "WARNING: Login failed for user Mallory" $2

log_message "SEVERE: Received SEGFAULT signal from process Eve" $2

log_message "INFO: Logout occurred for user Alice" $2

log_message "INFO: User Walter accessed file /var/log/messages" $2

log_message "INFO: Login successful for user Chuck" $2

log_message "INFO: Password updated for user Craig" $2

log_message "SEVERE: Disk write failure" $2

log_message "SEVERE: Unable to complete transaction - Out of memory" $2 4

done

1

Generates a syslog-like log message

2

The first parameter ($1) passed to the Bash script; we can specify any number of log line iterations

3

The second parameter ($2) specifies the log output filename

4

The output we selected was a pseudo-output stream of items you may find in a logfile

With the Bash script loaded into the new EC2 instance, you can run the script to generate some test log data for Amazon EMR to work with later in this chapter. In this example, the Bash script was stored as generate-log.sh. The example run of the script will generate 1,000 iterations or 10,000 lines of log output to a logfile named sample-syslog.log:

$ chmod +x generate-log.sh

$ generate-log.sh 1000 ./sample-syslog.log

Let’s examine the output the script generated. Opening the logfile created by the Bash script, you can see a number of repetitive log lines are created, as shown in Example 2-1. There will be some variety in the frequency of these messages based on other processes running on the EC2 instance and other EC2 instances running on the same physical hardware as our EC2 instance. You can find a little more detail on how other cloud users affect the execution of applications in Appendix B.

Example 2-1. Generated sample syslog

Apr 15 23:27:14 hostname.local ./generate-log.sh[17580]: INFO: Login

successful for user Alice

Apr 15 23:27:14 hostname.local ./generate-log.sh[17580]: INFO: Login

successful for user Bob

Apr 15 23:27:14 hostname.local ./generate-log.sh[17580]: WARNING: Login

failed for user Mallory

Apr 15 23:27:14 hostname.local ./generate-log.sh[17580]: SEVERE: Received

SEGFAULT signal from process Eve

Apr 15 23:27:14 hostname.local ./generate-log.sh[17580]: INFO: Logout

occurred for user Alice

Apr 15 23:27:14 hostname.local ./generate-log.sh[17580]: INFO: User

Walter accessed file /var/log/messages

Apr 15 23:27:14 hostname.local ./generate-log.sh[17580]: INFO: Login

successful for user Chuck

Apr 15 23:27:14 hostname.local ./generate-log.sh[17580]: INFO: Password

updated for user Craig

Apr 15 23:27:14 hostname.local ./generate-log.sh[17580]: SEVERE: Disk write failure

Apr 15 23:27:14 hostname.local ./generate-log.sh[17580]: SEVERE:

to complete transaction - Out of memory

Diving briefly into the details of the components that compose a single log line will help you understand the format of a syslog message and how this data will be parsed by the Amazon EMR Job Flow. Looking at this log output also helps you understand how to think about the components of a message and the data elements needed in the MapReduce code that will be written to compute message frequency.

Apr 15 23:27:14

This is the date and time the message was created. This is the item that will be used as a key for developing the counts that represent message frequency in the log.

hostname.local

In a typical syslog message, this part of the message represents the hostname on which the message was generated.

generate-log.sh

This represents the name of the process that generated the message in the logfile. The script in this example was stored as generate-log.sh in the running EC2 instance, and this is the name of the process in the logfile.

[17580]

Typically, every running process is given a process ID that exists for the life of the running process. This number will vary based on the number of processes running on a machine.

SEVERE: Unable to complete transaction - Out of memory

This represents the free-form description of the log message that is generated. In syslog messages, the messages and their meaning are typically dependent on the process generating the message. Some understanding of the process that generated the message is necessary to determine the criticality and meaning of the log message. This is a common problem in examining computer log information. Similar issues will exist in many data analysis problems when you’re trying to derive meaning and correlation across multiple, disparate systems.

From the log analysis example application used to demonstrate AWS functionality throughout this book, we know there is tremendous diversity in log messages and their meaning. Syslog is the closest thing to a standard in logging when it comes to computer logs. Many would argue that it’s a bit of a stretch to call syslog a standard, because there is still tremendous diversity in the log messages from system to system and vendor to vendor. However, a number of RFCs define the aspects and meaning of syslog messages. You should review RFC-3164, RFC-5452, and RFC-5427 to learn more about the critical aspects of syslog if you’re building a similar application. Logging and log management is a very large problem area for many organizations, and Logging and Log Management: The Authoritative Guide to Understanding the Concepts Surrounding Logging and Log Management, by Anton Chuvakin, Kevin Schmidt, and Christopher Phillips (Syngress), covers many aspects of the topic in great detail.

Moving Data to S3 Storage

A sample data set now exists in the running EC2 instance in Amazon’s cloud. However, this data set is not in a location where it can be used in Amazon EMR because it is sitting on the local disk of a running EC2 instance. To make use of this data set, you’ll need to move the data to S3, where Amazon EMR can access it. Amazon EMR will only work on data that is in an Amazon S3 storage location or is directly loaded into the HDFS storage in the Amazon EMR cluster.

Data in S3 is stored in buckets. An S3 bucket is a container for the objects, files, and directories of information that you store in it. S3 bucket names need to be globally unique, so choose your bucket name wisely. The bucket naming convention is a unique URL naming constraint. An S3 bucket can be referenced by URL to interact with S3 with the AWS REST API.

You have a number of methods for loading data into S3. A simple method of moving the log data into S3 is to use the s3cmd utility:

hostname $ s3cmd --configure

For more information on installation and configuration of s3cmd, refer to the s3cmd website. Let’s go ahead and move the sample log data into S3. Example 2-2 shows a sample usage of s3cmd to load the test data into an S3 bucket named program-emr.

Example 2-2. Load data into an S3 bucket

hostname $ s3cmd mb s3://program-emr 1

Bucket 's3://program-emr/' created

hostname $ s3cmd put sample-syslog.log s3://program-emr 2

sample-syslog.log -> s3://program-emr/sample-syslog.log [1 of 1]

988000 of 988000 100% in 0s 7.44 MB/s done

hostname $

1

Make a new bucket using the mb option. The new bucket created in the example is called program-emr.

2

An s3cmd put is used to move the logfile sample-syslog.log into the S3 bucket program-emr.

All Roads Lead to S3

We chose the s3cmd utility to load the sample data into S3 because it can be used from AWS resources and also from many of the systems located in private corporate networks. Best of all, it is a tool that can be downloaded and configured to run in minutes to transfer data up to S3 via a command line. But fear not: using a third-party unsupported tool is not the only way of getting data into S3. The following list presents a number of alternative methods of moving data to S3:

S3 Management Console

S3, like many of the AWS services, has a management console that allows management of the buckets and files in an AWS account. The management console allows you to create new buckets, add and remove directories, upload new files, delete files, update file permissions, and download files. Figure 2-7 shows the file uploaded into S3 in the earlier examples inside the management console.

S3 Management Console

Figure 2-7. S3 Management Console

AWS SDK

AWS comes with an extensive SDK for Java, .NET, Ruby, and numerous other programming languages. This allows interactions with S3 to load data and manipulation of S3 objects into third-party applications. Numerous S3 classes direct manipulation of objects and structures in S3. You may note that s3cmd source code is written in Python, and you can download the source from GitHub.

S3 REST API

S3 also has a REST API that allows for interaction with S3 using standard HTTP web service calls to manipulate S3 buckets and objects.

AWS Command Line Interface

The AWS Command Line Interface (CLI) performs many of the same functions and features as s3cmd. Files can be uploaded and downloaded from S3 buckets. The utility also supports a sync feature to keep a local repository of objects stored in S3 in sync. This utility also supports controlling other AWS services like EC2. The full list of services supported by this utility are available on the AWS CLI reference page. At the time of writing this book, this utility had only recently come out of beta. The utility does not currently support controlling Amazon EMR services.

Developing a MapReduce Application

Amazon EMR and the underlying Hadoop frameworks it uses are built using the Java programming language. To turn the MapReduce pseudocode into a Custom JAR MapReduce Job Flow, you will need to have a system set up to do Java development and will need Hadoop Java JARs to build an application that Amazon EMR can consume and execute. To get ready to develop your first Amazon EMR, review Appendix C to set up your development environment.

Custom JAR MapReduce Job

Amazon EMR provides a number of ways to write map and reduce procedures, including Hive, Streaming, Pig, or Custom JAR. A number of these Job Flow types will be covered throughout this book. Because we are programmers at heart, let’s start using Java to write a Custom JAR for the map and reduce procedures. Each of these EMR technology types can be used to analyze data as a computational step in the Amazon EMR Cluster. The set of steps run in Amazon EMR Cluster comprise a Job Flow for analyzing a data-set. Hive, Streaming, Pig, and Custom Jar are the Job Flow types that can be used as steps in an Amazon EMR cluster.

Now that the theory behind how the MapReduce framework has been covered, let’s translate the pseudocode into a Custom JAR Job Flow written in Java. A JAR file is simply a compressed archive of compiled Java code. Writing Java applications for Amazon EMR follows the same pattern as writing applications for Hadoop. The code developed here will cover the map, reduce, and driver procedures. The driver procedure is the main entry point that wires together the Job Flow application and tells MapReduce the classes to use for map and reduce tasks. Translating the pseudocode into Java code creates a map function implementation as shown in Example 2-3.

Example 2-3. Mapper for counting log records per second

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;

public class LogMapper extends MapReduceBase

implements Mapper<LongWritable, Text, Text, IntWritable>

{

private Text word = new Text();

private final static IntWritable one = new IntWritable( 1 );

public void map( LongWritable key,

Text value,

OutputCollector<Text, IntWritable> output,

Reporter reporter) throws IOException

{

// Get the value as a String

String text = value.toString();

// Retrieve the date and time out of the log message, first 15 characters

String SyslogDateTime = text.substring(0, 15);

// Output the syslog date and time as the key and 1 as the value

output.collect( new Text(SyslogDateTime), one );

}

}

From this example, note that there are no special AWS classes or libraries used to write the map procedure. The Mapper interface comes from Hadoop Mapper imports in the simple LogMapper class.

The map method is passed a portion of the raw data file as input. The map method focuses on the value passed to it because this represents an individual row from the logfile. Looking at the sample data, we can see the date and time are the first 15 characters of each line of input. The mapmethod will extract the date and time from the first 15 characters and use this as the key. The final portions of map procedure will emit the date and time key and a value of one for each line in the logfile.

Let’s move on to the reduce procedure. The psuedocode can be translated into the reduce procedure in a similar fashion to Example 2-4.

Example 2-4. Reducer for counting log records per second

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

public class LogReducer extends MapReduceBase

implements Reducer<Text, IntWritable, Text, IntWritable>

{

public void reduce( Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output,

Reporter reporter) throws IOException

{

// Counts the occurrences of the date and time

int count = 0;

while( values.hasNext() )

{

// Add the value to our count

count += values.next().get();

}

// Output the date and time with its count

output.collect( key, new IntWritable( count ) );

}

}

The reduce method passes an iterator for the value parameter. This iterator points to the array of values for each key the method receives. The value of each element is not relevant for the reducer in this simple example because every value is set to the value of one. The reduce method simply iterates through and counts the number of elements in the array that are of the same key—namely, date and time.

The final piece wires all these procedures together and is the main entry point for the Job Flow. The driver method defines the map and reduce methods to use in the Amazon EMR Job Flow, as shown in Example 2-5.

Example 2-5. Driver class for the log analyzer MapReduce Job Flow

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class LogAnalysisDriver extends Configured implements Tool {

public int run(String[] args) throws Exception

{

JobConf conf = new JobConf(getConf(), getClass());

conf.setJobName("Log Analyzer");

FileInputFormat.addInputPath(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(LogMapper.class);

conf.setCombinerClass(LogReducer.class);

conf.setReducerClass(LogReducer.class);

JobClient.runJob(conf);

return 0;

}

public static void main(String[] args) throws Exception {

int exitCode = ToolRunner.run(new LogAnalysisDriver(), args);

System.exit(exitCode);

}

}

To use the simple log analyzer, we must compile the driver, map, and reduce methods into a JAR file and load the JAR file into an S3 bucket. In the next sections, we’ll run the methods built here against the sample log, and then run an Amazon EMR Job Flow to generate the log frequency analysis results.

Running an Amazon EMR Cluster

Let’s walk through executing the simple log analyzer in Amazon EMR. Start by choosing Create Cluster from the Amazon EMR Console. As shown in Figure 2-8, the Job Flow is given a name and the S3 location to use to write any log information from the Cluster, or Job Flow, run.

Amazon EMR New Cluster creation

Figure 2-8. Amazon EMR New Cluster creation

Under Add Step, select the step type of Custom JAR. The parameters and location of our Custom JAR are defined for the step by selecting Configure and add. Our Custom JAR is added to the EMR Cluster by configuring a processing step in the Steps section. In defining the parameters for the job in Figure 2-9, we specify the JAR filename and location based on its location in S3 storage. We also define the parameters needed for the execution of the Job Flow as arguments. The first parameter is the main driver class in the JAR file. In Example 2-5, a set of required parameters defines the input file and output path of the results. The sample input file—_sample-syslog.log_—is set as the input file, and a new S3 location is defined as the output object to store the analysis from the Job Flow. Below the Step configuration is a setting to Auto-terminate the cluster after our step has completed. In the examples in this book, we will set this setting to yes so the EMR cluster will go away after it is finished processing. This can help to reduce the usage charges in AWS. Without setting this option, the cluster will continue running until you choose to terminate it from the EMR Console. You will continue to be charges AWS usage charges until the cluster terminates.

Amazon EMR Job Flow step parameters

Figure 2-9. Amazon EMR Job Flow step parameters

For the remaining options in the Create Cluster screen, we will use the default sizing options to run the first Job Flow. At the end, as shown in Figure 2-10, the new Job Flow is created and your first Amazon EMR Job Flow is off and running in Amazon’s data center!

Amazon running the Custom JAR in the EMR Console

Figure 2-10. Amazon running the Custom JAR in the EMR Console

TIP

A common cause of Job Flow failure is the use of an S3 output location that already exists. In the examples, we chose an output location that did not already exist in the S3 bucket prior to running the Job Flow. If the output path specified in the JAR parameters already exists, in most instances it will cause the job to fail. You may experience this by trying to run the same job more than once with the exact same parameters. The first time the Job Flow is run it will succeed, but if you run it again with exactly the same parameters, all subsequent attempts will fail.

Viewing Our Results

After the job completes, the analysis results will be available in S3 and you can retrieved them to review the frequency counts in the log. The job will generate a part file for each reducer task that is created by Amazon EMR. In general, a reducer is run on each of the core and task nodes in the Amazon EMR cluster. Looking at the results of one of these part files, we can see that they look very similar to what we expected from the walkthrough of the MapReduce process with pseudocode:

Apr 21 19:16:38 1 50 2

Apr 21 19:16:43 159

Apr 21 19:16:44 159

Apr 21 19:16:47 160

...

1

The key selected in the mapper was the date and time of the log entry. The key was emitted out in the results by the reducer.

2

The reducer counted the number of instances of each key, and the total is emitted as the second column of the result set.

The output from Amazon EMR may be one or many individual part files. The number of part files generated is related to the number of reduce processes executed in the Amazon EMR cluster. If your application calls for recombining the result set into a single consolidated file, you can accomplish this by taking the result set and loading it into an Amazon Relational Database, or running the result set through another application or Amazon EMR Job Flow.

Debugging a Job Flow

You may be asking yourself now, “What will I do if I have an error in my application that is running in the cloud?” Fortunately, there are a number of tools and techniques available to find out more information about Amazon EMR jobs running in the cloud. In a time-honored tradition, let’s add a number of print statements to the mapper and reducer methods so we can walk through some debugging techniques.

In the mapper method in Example 2-6, a standard error output line is added to detail that the application is executing the map method of the Job Flow.

Example 2-6. Mapper with logging statements

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;

public class LogMapper extends MapReduceBase

implements Mapper<LongWritable, Text, Text, IntWritable>

{

private Text word = new Text();

private final static IntWritable one = new IntWritable( 1 );

public void map( LongWritable key,

Text value,

OutputCollector<Text, IntWritable> output,

Reporter reporter) throws IOException

{

// Get the value as a String

String text = value.toString();

// Output a log message

System.err.println("We are inside the map method"); 1

// Retrieve the date and time out of the log message, first 15 characters

String SyslogDateTime = text.substring(0, 15);

// Output the syslog date and time as the key and 1 as the value

output.collect( new Text(SyslogDateTime), one );

}

}

1

A simple log statement to indicate the execution of the map routine in the log output

For the reduce method, we’ll add similar logging to the routine to indicate the execution of the reducer. In addition, we’ll intentionally introduce an arithmetic error to create a problem in the application—a division-by-zero operation will cause the reduce routine to fail during execution.Example 2-7 shows the changes made to the reduce method.

Example 2-7. Reducer that will fail with an arithmetic exception

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

public class LogReducer extends MapReduceBase

implements Reducer<Text, IntWritable, Text, IntWritable>

{

public void reduce( Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output,

Reporter reporter) throws IOException

{

// Output a log message

System.err.println("We are inside the reduce method"); 1

// Counts the occurrences of the date and time

int count = 0;

while( values.hasNext() )

{

// Output a log message

System.err.println("Uh oh! We are going to divide by zero!");

// Add the value to our count and divide by zero

count += ( values.next().get() / 0 ); 2

}

// Output the date and time with its count

output.collect( key, new IntWritable( count ) );

}

}

1

A simple log statement to indicate execution of the reducer in the log output

2

The alteration of the calculation to do divide by zero to cause the reducer to fail

Now that the application has been modified to intentionally fail, let’s upload the new JAR file and run through a debug of the application in Amazon EMR.

Running Our Job Flow with Debugging

When creating a new Job Flow, we have the option to enable logging and debugging, and we can enable them independently. During the development phases of an application, it makes sense to enable these options to review application runs and track problems. When logging is enabled, the logs of each Job Flow are written to an S3 location that is chosen on Job Flow creation. If debugging is also enabled, Amazon EMR creates indexes of the logfiles’ contents, which enables the Debug view of steps and tasks on the Amazon EMR Management Console to review a Job Flow run.

The same initial parameters used in Figures 2-8 and 2-9 are used to start the Job Flow. When setting up a new cluster, or Job Flow, in Figure 2-8select a location to store the Job Flow logs. The debugging option is turned on by default, but confirm this option is enabled before starting the Job Flow.

The Job Flow happily gets started by Amazon EMR as before, but when the job finishes it does not show as Terminated - All Steps Completed as it did earlier. Reviewing the state of the Job Flow in the Amazon EMR console shows it as simply Terminated. Looking at the S3 output location, output from the failed run is not available in the run0 folder.

TIP

Enabling Job Flow logging and debugging is a great idea in development and testing. However, leaving logging and debugging turned on for production Job Flows can use up a significant amount of S3 storage for the logfile and SimpleDB indexes. These options may also greatly impact the performance of a Job Flow. Many developers will choose to use Amazon AWS libraries or third-party logging utilities to control and set logging levels for their Job Flows in production environments.

Reviewing Job Flow Log Structure

Each Job Flow in Amazon EMR is given a unique identifier. The Job Flow IDs follow the pattern of “j-XXXXXXXXXXXXX.” In Figure 2-11, the Amazon EMR console gives a number of details about the execution of a Job Flow. By clicking on the Job Flow that terminated with errors in the EMR Management Console, details including the unique ID Amazon EMR assigned to the Job Flow are displayed in the Cluster Details page.

Job Flow name in the Amazon EMR console

Figure 2-11. Job Flow name in the Amazon EMR console

Looking in the S3 bucket and path that were set as the log location on the Job Flow in Amazon EMR, we can see a number of new files and folders are now available following the Job Flow execution. The folder name has the same name as the unique Job Flow ID that Amazon assigned to it in the Amazon EMR console. The directory structure of the logs for the failed Job Flow run in the example from Figure 2-11 appears in a folder named j-391947SOBCQM. The following list describes the details of the directory structure and information recorded about the Job Flow run in S3, as well as the purpose of each recorded element:

daemons

The logs from each Hadoop process are stored in this folder. There is a directory for each EC2 instance that composed our Amazon EMR cluster. The directory name is the same name as the EC2 instance used in our cluster. You can determine the purpose of each node by reviewing the logfiles in each directory. Each node can be a data node or job tracker node, which map to the core, task, and master groups discussed earlier.

jobs

The configuration settings used during Job Flow execution in the Amazon EMR cluster are available in the logs in this folder. There are also independent logfiles that detail the reduce and map execution and the number of attempts performed on each of these.

node

Node logs detail the bootstrap actions taken on each node. The directory structure starts with the same name as the EC2 instance that composed the Amazon EMR cluster. The details of the logfiles contained here can be useful if you are using a number of custom bootstrap actions in the setup of your cluster.

steps

Job Flows are broken into steps. The example Job Flow in this chapter is a very simple process that only contains two steps: the startup, which is performed by Amazon EMR on every Job Flow, and the MapReduce phase that runs the map and reduce methods. On more complex Job Flows, there may be many step directories, or one for each step in the overall MapReduce Job Flow. The logs of each step have the following structure:

controller

Information about the execution of the step and the status of the step at the end of the execution.

syslog

Lists the execution of the step and the status of each task attempt. In our test, you can see each task attempt returned a “FAILED” status.

stdout

The standard output from the process run in the step.

stderr

The standard error from the process run in the step. Because the logging information was written to standard error in the map and reduce methods, the log information appears here in step 2 as well as the stack trace when the application performs a divide-by-zero operation.

task-attempts

The logs for each task attempt are stored here. The logs of each task have the following structure:

syslog

Detailed log information on the execution attempt of the task. In the recent run, a divide-by-zero exception appears in the logfile along with the stack trace of the failure to help trace the error to the line number in the code that failed.

stdout

The standard output from the process run of the attempt.

stderr

The standard error from the process run of the attempt. Because the logging information was written to standard error in the map and reduce methods, the individual log statements are visible in the output of the log information here for each attempt.

If debugging had not been enabled on the Job Flow, you’d need to review the logfiles individually to locate the application error. This is definitely possible, but because we did enable debugging, we can use the Amazon EMR console debug feature to review the logs without needing to understand the log hierarchy and execution process of the application.

Debug Through the Amazon EMR Console

When you review the Steps section of the Cluster Details page, you can see each step that was attempted and its status in the debug user interface in the EMR Console. From the previous section on logfiles, this information is stored in /jobid/steps and can also be viewed directly in S3.Figure 2-12 shows the graphical representation of the step log data in the Amazon EMR Console.

Amazon EMR Cluster Details displaying the log files and debugger actions of the failed Job Flow

Figure 2-12. Amazon EMR Cluster Details displaying the log files and debugger actions of the failed Job Flow

Looking at each of the steps in the Cluster Details from Figure 2-12 shows that the execution of the Custom JAR application failed. The Custom JAR step represents the execution of the map and reduce methods on your syslog data. The controller, syslog, stderr, and stdout map directly to the log structure discussed earlier. The map and reduce methods written earlier write their log information to standard error with Java calls to System.err.println(). In reviewing the stderr logs from the example step named First Custom JAR Job Flow Step, the log output displays the exception being thrown by arithmetic error in the Job Flow in Figure 2-13 .

Failed step logging output and exception

Figure 2-13. Failed step logging output and exception

In this simple error case, this is probably enough information to help us pinpoint the problem in the application. In a real-world scenario, however, there may have been individual tasks in the Job Flow that failed due to data-specific issues. After clicking on the View Jobs option of the failed step, you see a graphical view with the job details for S3 located in the jobs folder of the logs.

Drilling further down into the run of the Job Flow, you can get a view of the individual tasks that composed the Job Flow by clicking on View Tasks. The task view in the debugger is sourced from the indexed information from the log data in the jobs folder. When you look at the raw log inFigure 2-14 and compare this to the graphical view in the Amazon EMR console in Figure 2-15, it becomes evident why some may prefer to use the graphical debugger for troubleshooting.

Raw job log data with task status

Figure 2-14. Raw job log data with task status

You can further drill down from the tasks in Figure 2-15 to view each attempt Amazon EMR made in trying to get the Job Flow to complete. When you click View Attempts, you’ll see the familiar syslog, stderr, stdout structure from the log data under your task-attempts folder on S3 in graphical form.

Task view in the Amazon EMR debugger in Cluster Details

Figure 2-15. Task view in the Amazon EMR debugger in Cluster Details

Here you may be able to discover situations where the individual task attempts succeeded or failed if you had cases where different data situations are causing failure only occasionally. If you click stderr, you can see the individual log messages from the execution map and reduce methods from the Job Flow run:

...

We are inside the map method

We are inside the map method

We are inside the map method

We are inside the map method

We are inside the map method

We are inside the reduce method

Uh oh! We are going to divide by zero!

The exception generated intentionally in the application is under syslog. Here the stack trace of the exception lists the call tree leading up to the error, and the error can be traced back to the line of code that caused it. You can find the same information tracing through the logs in S3, but the debugger in the Amazon EMR console allows you to conveniently drill down through the logs without needing to jump back and forth between different files in the S3 log structure.

...

2013-06-15 20:34:47,162 INFO org.apache.hadoop.io.compress.CodecPool (main): ...

2013-06-15 20:34:47,168 INFO org.apache.hadoop.mapred.TaskLogsTruncater ...

2013-06-15 20:34:47,250 INFO org.apache.hadoop.io.nativeio.NativeIO (main): ...

2013-06-15 20:34:47,250 INFO org.apache.hadoop.io.nativeio.NativeIO (main): ...

2013-06-15 20:34:47,253 WARN org.apache.hadoop.mapred.Child (main): Error ...

java.lang.ArithmeticException: / by zero

at com.programemr.LogReducer.reduce(LogReducer.java:31)

at com.programemr.LogReducer.reduce(LogReducer.java:13)

at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1436)

Our Application and Real-World Uses

We have now successfully built the first building block of the log analysis application described in Chapter 1. The application can now receive syslog-formatted log records and determine the frequency of log events using Amazon EMR to count the number of records per second.

This application is primarily focused on log analysis, but counting and frequency analysis has many known uses in other data analysis situations. The MapReduce application is performing what is considered a summarization design pattern by simply summing up the values of a common key. Other real-world applications of this technique are:

Load or usage analysis

Many times it is useful to know how many users access a server or a website throughout a time period. Web access logs or application logs that include the timestamps of user events could be imported and processed with a similar MapReduce application to determine usage frequency.

Minimum, maximum, average, and trending

From the individual number of events per second we calculated in this chapter, you could load this data into a database, Excel, or even another Amazon EMR Job Flow and determine what the maximum, minimum, and average load of events were on the server throughout the day. This same technique could be used to determine the peak traffic to a website to know if more capacity should be purchased or planned, when to have more staff available throughout the day, or what may be common slow periods so you can schedule maintenance or reduce staffing.