Data Filtering Design Patterns and Scheduling Work - Programming Elastic MapReduce (2014)

Programming Elastic MapReduce (2014)

Chapter 3. Data Filtering Design Patterns and Scheduling Work

Our initial example from the previous chapter was a fairly simple application, but by now you should understand the basics of getting an Amazon EMR job running with log data. The application only involved grouping data records based on time in order to determine the frequency of the messages we received every second. However, in many data analysis problems, you want to filter your data down to a smaller data set and focus the analysis on only key parts of the data set that are interesting. Like our log analysis scenario, a lot of the data analysis problems focus on analyzing error scenarios and anomalies. With large data sets this may feel like finding a needle in a haystack.

In this chapter, we’ll extend the Amazon EMR application to demonstrate a number of additional useful MapReduce patterns for filtering and analyzing data sets. In demonstrating these new building blocks, we’ll use a new data source that contains a greater variety of data than the earlier scenario. Going back to our NASA theme, you will use a web access log published by NASA and analyze this log for web server errors. The MapReduce patterns that we’ll look at will reduce the web server log data down to find requests resulting in HTTP errors on NASA’s website. Additionally, we’ll combine concepts learned in Chapters 2 and 3 to show how filtering and summarization can be used to gain greater insights into the data.

Toward the end of this chapter, we’ll look at production aspects of Amazon EMR applications with a focus on some basic ways to schedule the data processing work with AWS services and tools. Companies that are heavy users of Amazon EMR sometimes build entire proprietary workflow systems to control, schedule, and maintain the AWS resources used by their organization. Netflix, for example, recently open-sourced its Genie system, which it is building to manage its Amazon EMR clusters, Job Flows, and scheduling. There are a number of great utilities already available at AWS, including the Amazon EMR command-line interface (CLI), that you can utilize to achieve a number of basic Amazon EMR operational tasks without needing to build an entire workflow system yourself. To that end, we’ll do a basic walkthrough of using the Amazon EMR CLI with Unix scripts and utilities running inside an Amazon EC2 instance to demonstrate scheduling Job Flows in Amazon EMR.

In addition to the Amazon EMR CLI, this chapter will explore the use of the AWS Data Pipeline. The Data Pipeline allows you to create workflow processes to move data between AWS services, schedule work like Amazon EMR workflows for data analysis, and perform numerous other functions. We use it to build a scheduling scenario for the web log filtering Job Flow created in this chapter.

Extending the Application Example

The application components in this chapter will follow the same data flow pattern covered in Chapter 2. From Chapter 1, you will recall part of the example application pulled in a data set from a web server. Web server log data will be the input into the workflow where we’ll extend the application components to do deeper analysis using MapReduce design patterns. Figure 3-1 shows the portion of our overall application and the flow of data through the system in this chapter.

Chapter application data and workflow architecture

Figure 3-1. Chapter application data and workflow architecture

Understanding Web Server Logs

Web servers like Apache and IIS typically log every request that users and systems make to retrieve information from a web server. Many companies today are already using their web server logs for data analysis problems. The use of these logs ranges from A/B testing of new website designs to analyzing user website actions to improve sales.

NASA published the web server logfile used in this chapter back in 1995. At the time, these web access logs were used as part of a paper entitled “Web Server Workload Characterization: The Search for Invariants” and appeared in the proceedings of the 1996 ACM SIGMETRICS Conference on the Measurement and Modeling of Computer Systems. This seems like a long time ago, but the format and meaning of the web server logs has not changed greatly over the years.

You can download the logs to use in the Amazon EMR MapReduce building blocks developed throughout this chapter. We’ll perform the analysis using the July 1995 logfile. The logfile has a good variety and diversity of successful and unsuccessful web requests made to the web server.

After downloading the web access log and opening the file, looking at the individual log records will give us a number of entries similar to the following:

piweba2y.prodigy.com - - [02/Jul/1995:00:01:28 -0400] "GET ..." 404 -

dd04-014.compuserve.com - - [02/Jul/1995:00:01:28 -0400] "GET ..." 200 7074

j10.ptl5.jaring.my - - [02/Jul/1995:00:01:28 -0400] "GET ..." 304 0

198.104.162.38 - - [02/Jul/1995:00:01:28 -0400] "GET ..." 200 11853

buckbrgr.inmind.com - - [02/Jul/1995:00:01:29 -0400] "GET ..." 304 0

gilbert.nih.go.jp - - [02/Jul/1995:00:01:29 -0400] "GET ..." 200 1204

Individual log entries follow a pretty simple format of space-delimited columns, with quotes and brackets used to further delimit columns that contain spaces in the data. Let’s first examine the meaning of each of these data elements. Looking at the data this way will help you figure out themap and reduce procedures to parse and analyze the web server log.

You won’t use every column in the log in this chapter, but the data still needs to be parsed to get to the columns used in the analysis. A single log record row breaks down into the following data elements:

piweba2y.prodigy.com - - [02/Jul/1995:00:01:28 -0400]

"GET /KSC.HTML HTTP/1.0" 404 -

IP address or hostname of client: -piweba2y.prodigy.com

The first element is the IP address or hostname of the client computer making a request to retrieve information from the web server. In this dated example, note that the request came from some web client inside the Prodigy network.

Identity check directive: -

This element is part of the identity check directive based on RFC 1413. In practice this data is very unreliable except in very tightly controlled networks. In the web logfile, a hyphen indicates that data is not available for this column. A common data analysis problem is having data sets with missing or invalid data values. You can use filtering to remove data with these issues to cleanse the data prior to further analysis. For now, you don’t have to worry about it, because we won’t be focusing on this column for this chapter.

User ID: -

The third column is the user ID of the user making the request to the web server. This typically requires that you enable HTTP authentication to receive this information in the log. In this example record, no data is provided for this column and a hyphen indicates the empty value received.

Date, time, and time zone: [02/Jul/1995:00:01:28 -0400]

The fourth column is the date, time, and time zone offset of when the request completed on the web server. The time zone offset of (-0400) indicates the server is four hours behind coordinated universal time (UTC). UTC is a time similar to Greenwich Mean Time (GMT), but is not adjusted for daylight savings time. The incorporation of the time zone offset can help coordinate events across servers located in different time zones. The full date and time is enclosed in brackets ([ ]) so we can parse the data can be parsed utilizing the delimiters to retrieve the full time field, including any spaces in the data.

Web request: "GET /KSC.HTML HTTP/1.0"

The request line received from the client is delimited by double quotes. There is a lot of useful information in the request line—including if it was a GET, PUT, or other type of request—and, of course, the path and resource being requested. In this example, the client did a GET request forKSC.HTML. This column will be used in later examples to show the requests being made that resulted in an error in the web log.

HTTP status sode: 404

This is the status code that the web server sent back to the client from the request. We’ll use this later to filter out only web server records that contain requests that resulted in an error. The map procedure, shown later, will use this data to determine what data should be kept and what data should be thrown away. In general, the first digit of the status code designates the class of response from the web server. A successful response code has a beginning digit of 2; a redirection begins with a 3; an error caused by the web client begins with a 4; and an error on the web server begins with a 5. The full list of status codes is defined in the HTTP specification in RFC2616. In this example record, a 404 response code was sent back to the client. This means the request was for something that could be found on the web server. Isolating 404 requests could be useful in finding broken links in a website or potentially locating someone maliciously making lots of requests to find known scripts or command files that may help him gain access to a system.

Data size: -

The final data element is the size of the object returned. This is typically expressed in bytes transferred back to the client. The example record has a hyphen for the size of the data returned because the request was invalid and no object was found to return.

Now that the layout and meaning of the new data set has been covered, let’s look at how data filtering can be done in an Amazon EMR application.

Finding Errors in the Web Logs Using Data Filtering

Data filtering is probably one of the simplest uses of the MapReduce framework. Filtering allows you to reduce your data set from a very large one to only a subset of data on whic you can do further processing. The filtered data set that is returned could be large or small—however, the key is the data has been filtered to support the application’s analytics.

The MapReduce framework and Amazon EMR are well suited for performing a distributed filtering task. Amazon EMR splits the web log into a number of smaller data files depending on the number of core and task nodes in your cluster. The filtering process takes each smaller file and executes the map procedure of the Job Flow. The map procedure reduces the data set to the portions of the data needed for further analytics. Figure 3-2 shows a high-level diagram of how this process works and the MapReduce filter pattern that will be implemented in this chapter.

MapReduce filter pattern for error filtering

Figure 3-2. MapReduce filter pattern for error filtering

The following pseudocode demonstrates the algorithm being implemented in the mapper method:

map( "Log Record" )

Parse Log Record elements

If a record contains an error

Emit Log Record and Error Code

Else

Do Nothing

In this case, the map procedure only emits the records that contain an HTTP status code that indicates an error occurred in the request. If the log entry is a successful request, the record will not be emitted from the mapper for any further analysis and processing. This has the effect of throwing away all the successful web requests and only passing along the error entries to the reduce phase of the Job Flow.

For many filtering scenarios, the reduce phase may not be necessary because the map portion of the code has already done the hard work of parsing the record and filtering down the data set. Thus, the pseudocode for our reducer is very simple:

reduce( Key, Values )

for each value

emit (Key)

The reduce phase of the Job Flow simply removes any grouping on keys of the data received from the earlier phases of the MapReduce cycle. The original error log line is emitted back out into the final result set. The results will show up as individual part files in an S3 bucket. The number of individual part files created is based on the number of core and task nodes that run the reduce procedure in the Amazon EMR Job Flow.

Now that the web server log format and the MapReduce filter pattern concepts have been covered, let’s explore the actual map and reduce code needed to implement the web log filter.

Mapper Code

The mapper code looks like this:

import java.io.IOException;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;

public class WebLogErrorFilterMapper extends MapReduceBase

implements Mapper<LongWritable, Text, Text, IntWritable>

{

/** The number of fields that must be found. */

public static final int NUM_FIELDS = 7;

public void map( LongWritable key, // Offset into the file

Text value,

OutputCollector<Text, IntWritable> output,

Reporter reporter) throws IOException

{

// Regular expression to parse Apache Web Log

String logEntryPattern = "^(\\S+) (\\S+) (\\S+)

\\[([\\w:/]+\\s[+\\-]\\d{4})\\]" + " \"(.+?)\" (\\d{3}) (\\S+)";

// Get the Apache Web Log record as a String

String logEntryLine = value.toString();

// Compile regular expression for parsing input

Pattern p = Pattern.compile(logEntryPattern);

Matcher matcher = p.matcher(logEntryLine);

// Validate we have a valid log record

if (!matcher.matches() ||

NUM_FIELDS != matcher.groupCount())

{

System.err.println("Bad log entry:");

System.err.println(logEntryLine);

return;

}

// Get the HTTP request information from the log entry

Integer httpCode = Integer.parseInt(matcher.group(6));

// Filter any web requests that had a 300 HTTP return code or higher

if ( httpCode >= 300 )

{

// Output the log line as the key and HTTP status as the value

output.collect( value, new IntWritable(httpCode) );

}

}

}

A regular expression parses the individual data elements from each log record. The map procedure examines the HTTP status code from the parsed data and will only emit records out of the map method for an HTTP status code of 300 or greater. The results in the Job Flow processing only page requests that resulted in a redirect (300—399 status codes), a client error (400—499 status codes), or a server error (500—599 status codes). The filtering is performed in parallel, as the filtering work is distributed across the individual nodes in the Amazon EMR cluster.

Reducer Code

The reducer is very simple because the data set has already been filtered down in the mapper:

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

public class WebLogErrorFilterReducer extends MapReduceBase

implements Reducer<Text, IntWritable, Text, IntWritable>

{

public void reduce( Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output,

Reporter reporter) throws IOException

{

// Iterate over all of the values and emit each key value pair

while( values.hasNext() )

{

output.collect( key, new IntWritable( values.next().get() ) );

}

}

}

A simple loop through each value in the array passed to the reducer will emit each key and value pair into the final output data set. The reduce portion is not a requirement in MapReduce and could be eliminated from this filtering Job Flow. The reduce procedure is included in the application for completeness and to remove any unlikely grouping that could occur if duplicate log record entries were encountered by the mapper.

Driver Code

The driver code does not look very different from the work done in Chapter 2. The driver is required to set the map and reduce procedures in the Job Flow. The driver, as was implemented earlier, accepts the S3 input and output locations as arguments and sets the individual map and reduceclass links to set up the running of the Job Flow.

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import com.programemr.weblog_top_ten.WebLogErrorFilterMapper;

import com.programemr.weblog_top_ten.WebLogErrorFilterReducer;

public class WebLogDriver extends Configured implements Tool {

public int run(String[] args) throws Exception

{

JobConf conf = new JobConf(getConf(), getClass());

conf.setJobName("Web Log Analyzer");

FileInputFormat.addInputPath(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(WebLogErrorFilterMapper.class);

conf.setCombinerClass(WebLogErrorFilterReducer.class);

conf.setReducerClass(WebLogErrorFilterReducer.class);

JobClient.runJob(conf);

return 0;

}

public static void main(String[] args) throws Exception {

int exitCode = ToolRunner.run(new WebLogDriver(), args);

System.exit(exitCode);

}

}

Running the MapReduce Filter Job

The process of running the filter Job Flow is nearly identical to the steps followed in Chapter 2. Once the compiled Java JAR and the NASA Web Log have been uploaded to an S3 bucket, you can create a new Cluster, or Job Flow, utilizing the “Create cluster” option from the Amazon EMR Management Console. The Job Flow takes parameters similar to those laid out in Figure 3-3. The parameter for the new MapReduce JAR sets the main Java class along with the input and output locations needed for starting the Job Flow processing.

Example Amazon EMR filter Job Flow step parameters

Figure 3-3. Example Amazon EMR filter Job Flow step parameters

Analyzing the Results

After the Job Flow completes, you can retrieve the results from the output S3 location specified in the Job Flow parameters. The original data set contained a number of successful and failed requests, and in the end, the final data set shows the filtering that occurred and a set of results that only contains the individual error lines.

The data flow through the Map and Reduce phases can be diagrammed like the pipeline in Figure 3-4.

MapReduce Filter logical data flow

Figure 3-4. MapReduce Filter logical data flow

Let’s walk through what occurred in the filter Job Flow using a snapshot of some of the sample data from the NASA web logfile. The following snapshot is truncated to improve readability:

unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET ..." 200 3985

199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET ..." 200 4085

burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET ..." 304 0

199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET ..." 200 4179

burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET ..." 304 0

burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET ..." 200 0

205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET ..." 200 3985

The mapper method parsed each field and examined the HTTP status code value, only emitting lines that have a status code greater than 300. The entire original log line is passed as the key, and the HTTP status code that was examined by the mapper is the value. The HTTP status code emission enhances the readability of our final output because it will be placed as the last item on each output record. The output from the mapper would be similar to the following:

( burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET ..." 304 0, 304 )

( burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET ..." 304 0, 304 )

The data is further sorted and grouped by the MapReduce framework, and the reduce method will receive a set of grouped values. The log lines look the same with truncated GET request lines, but the individual requests are different. There are not any duplicate full log lines in the logfile, so the grouping that occurs after the mapper does not reduce the data set.

( burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET ..." 304 0, [304] )

( burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET ..." 304 0, [304] )

The simple reduce walks through the array of values in a loop and emits out each line and the HTTP status code. The final filtered results from the sample are shown here:

burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET ..." 304 0 304

burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET ..." 304 0 304

Building Summary Counts in Data Sets

We have now performed two basic but very common tasks in analyzing data. In many data analysis applications, key portions of a data set are chosen via filtering and then further calculations on this smaller set of data are performed. The counting example from Chapter 2 is an example of further analysis that could be done. In the log analysis application being used in this book, we can use a combination of these two analysis techniques to derive counts on the website URL locations in the NASA logs that resulted in an error. The code we’ll show in the next section demonstrates how to combine these techniques.

Mapper Code

The incoming data is parsed into individual fields with the same regular expression as was done in Mapper Code. This time, though, the focus is on the HTTP request to specific web pages:

import java.io.IOException;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;

public class WebLogErrorCountMapper extends MapReduceBase

implements Mapper<LongWritable, Text, Text, IntWritable>

{

private final static IntWritable one = new IntWritable( 1 );

/** The number of fields that must be found. */

public static final int NUM_FIELDS = 7;

public void map( LongWritable key, // Offset into the file

Text value,

OutputCollector<Text, IntWritable> output,

Reporter reporter) throws IOException

{

// Regular expression to parse Apache Web Log

String logEntryPattern = "^(\\S+) (\\S+) (\\S+)

\\[([\\w:/]+\\s[+\\-]\\d{4})\\]" + " \"(.+?)\" (\\d{3}) (\\S+)";

// Get the Apache Web Log record as a String

String logEntryLine = value.toString();

// Compile regular expression for parsing input

Pattern p = Pattern.compile(logEntryPattern);

Matcher matcher = p.matcher(logEntryLine);

// Validate we have a valid log record

if (!matcher.matches() ||

NUM_FIELDS != matcher.groupCount())

{

System.err.println("Bad log entry:");

System.err.println(logEntryLine);

return;

}

// Get the HTTP request information from the log entry

Integer httpCode = Integer.parseInt(matcher.group(6));

Text httpRequest = new Text(matcher.group(5));

// Filter any web requests that had a 300 HTTP return code or higher

if ( httpCode >= 300 )

{

// Output the HTTP Error code and page requested and 1 as the value

// We will use the value in the reducer to sum the total occurrences

// of the same web request and error returned from the server.

output.collect( new Text(httpRequest), one );

}

}

}

The logic in the mapper pulls the HTTP status code and the HTTP request from the individual log entry. The emitted records from the map method select the entries with an HTTP status code of 300 or greater. This time, the key will be the HTTP request made, and we’ll assign it a numerical value of 1 so a summation can be performed to total up the number of identical web requests.

Reducer Code

The reducer takes on the form of the summarization pattern used in Example 2-4. This is the same counting scenario used to find the frequency of log messages. The difference now is that the keys being delivered from the mapper method are a filtered set of web request errors instead of full log lines. The reducer will generate a total in the final result rather than ungrouping the data.

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

public class WebLogErrorCountReducer extends MapReduceBase

implements Reducer<Text, IntWritable, Text, IntWritable>

{

public void reduce( Text key, Iterator<IntWritable> values,

OutputCollector<Text, IntWritable> output,

Reporter reporter) throws IOException

{

// Iterate over all of the values (counts of occurrences

// of the web requests)

int count = 0;

while( values.hasNext() )

{

// Add the value to our count

count += values.next().get();

}

// Output the web request with its count (wrapped in an IntWritable)

output.collect( key, new IntWritable( count ) );

}

}

The driver code can be reused from our previous example in Mapper Code.

Analyzing the Filtered Counts Job

Recall the original data set that contained successful and failed requests. In this case, a similar filtering will reduce the data set for summarization in the reduce method. Let’s walk through a sample of the data set again to review what is occurring in each of the methods with the combination of summarization and filtering. The new sample data set contains a number of rows like the following:

netcom16 ... "GET /icons/sound.xbm HTTP/1.0"

200 530

alcott2 ... "GET /shuttle/missions/sts-71/images/KSC-95EC-0868.jpg HTTP/1.0"

200 61848

www-b6 ... "GET /:/spacelink.msfc.nasa.gov HTTP/1.0" 404 -

sac1-109 ... "GET /shuttle/missions/sts-71/mission-sts-71.html HTTP/1.0"

200 12040

jfpenter ... "GET /images/launch-logo.gif HTTP/1.0" 200 1713

ts02-ind-27 ... "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 67065

sac1-109 ... "GET /shuttle/missions/sts-71/sts-71-patch-small.gif HTTP/1.0"

200 12054

In the revised mapper method, each field is parsed and examined. The HTTP access request is emitted only with a status code greater than 300. The HTTP request field itself is used as the key, and you count the value of one to find out how many times the same request resulted in an error. The output of the mapper on the input file would then be similar to the following:

...

( "GET /:/spacelink.msfc.nasa.gov HTTP/1.0", 1 )

( "GET /:/spacelink.msfc.nasa.gov HTTP/1.0", 1 )

( "GET /:/spacelink.msfc.nasa.gov HTTP/1.0", 1 )

( "GET /:/spacelink.msfc.nasa.gov HTTP/1.0", 1 )

...

The data goes through the usual sorting and grouping by the MapReduce framework, and the reduce method receives a set of grouped values. A number of requests resulted in errors repeatedly in the data set, and they are grouped accordingly by the HTTP request key. The data set going to the reducer is grouped like the following example:

...

( "GET /%20%20history/apollo/apollo-13/apollo-13.html HTTP/1.0", [1] )

( "GET /%20history/apollo/apollo-13/apollo-13.html HTTP/1.0", [1, 1, 1, 1] )

( "GET /:/spacelink.msfc.nasa.gov HTTP/1.0", [1, 1, 1, 1, ...] )

( "GET /%3A/spacelink.msfc.nasa.gov HTTP/1.0", [1, 1, 1, 1, ...] )

( "GET /%7Eadverts/ibm/ad1/banner.gif HTTP/1.0", [1] )

...

The reduce method walks through the grouping that has been done from the mapper phase and adds up each value in the array. Because each of our keys is a request made to the web server and the value is simply the count of 1 for each occurrence, this has the net effect of creating a total count for each unique HTTP request that resulted in an error. The filter result set in the end is like this:

....

GET /%20%20history/apollo/apollo-13/apollo-13.html HTTP/1.0 1

GET /%20history/apollo/apollo-13/apollo-13.html HTTP/1.0 4

GET /%3A//spacelink.msfc.nasa.gov HTTP/1.0 31

GET /%3A/spacelink.msfc.nasa.gov HTTP/1.0 36

GET /%7Eadverts/ibm/ad1/banner.gif HTTP/1.0 1

....

NOTE

This book focuses primarily on the use of AWS and Amazon EMR to help you learn more about how you can build your application in Amazon’s cloud. However, a solid understanding of MapReduce and software development patterns is a good foundation to start with before building an application of your own. To that end, we recommend MapReduce Design Patterns: Building Effective Algorithms and Analytics for Hadoop and Other Systems (O’Reilly) by Donald Miner and Adam Shook—it’s an excellent resource to learn more about MapReduce patterns that could be relevant to any MapReduce project you start.

Job Flow Scheduling

Most of the items covered in the earlier chapters revolved around creating one-time runs of Job Flows. In a real-world operational scenario, the application will likely need to be a Job Flow that is run on a scheduled basis that processes new data when it becomes available.

The real strength of MapReduce and Amazon EMR is the ability to process large volumes of data. However, data may not always be available, or the time needed to load all required data into the cloud may necessitate processing the data in bulk on an hourly, daily, or weekly basis. This can also help to control the costs of running your Amazon EMR cluster.

The Amazon EMR Management Console allows new Job Flows to be created manually for one-time execution or long-running clusters. However, it does not have a scheduling option available. There are currently two major options available from Amazon. The Amazon EMR CLI can be used to control existing Job Flows and create new ones, or Amazon Data Pipeline can be used to create and schedule a full workflow of AWS services including Amazon EMR Job Flows.

Scheduling with the CLI

You can download the Amazon Elastic MapReduce Ruby client utility (Amazon EMR CLI) from Amazon’s Developer Tools site. The utility can be run from anywhere, including other running EC2 instances you may already have provisioned in AWS. If you decide to run the utility on an EC2 instance, the Ruby programming language prerequisite is preloaded for you. However, the Amazon EMR CLI tool itself is not preloaded on the Amazon Machine Images (AMI), so you will need to upload and configure it to your running EC2 instance.

The Amazon EMR CLI provides a number of useful features as well as a number of features that are not directly available from the Amazon EMR Management Console. This section will focus on new Job Flow creation, but you may find each of the following options useful in the operation and control of Job Flows. Many of these options are available in Amazon’s AWS API, but you can perform them using this utility without needing to be a programmer.

Create a new Job Flow: --create

This option allows you to create a new Job Flow from the command line, performing the same function as selecting Create New Job Flow in the Amazon EMR Management Console.

Create a Job Flow that stays running: --alive

The alive option allows your Amazon EMR cluster to continue running after it has completed all the steps in a Job Flow. This option is available under the Advanced Options and is called Keep Alive when you are creating a Job Flow from the Console. This may be a useful feature if you want to add work, also known as steps, to an already running Amazon EMR cluster. You will need to specifically terminate the Job Flow if this option is used. You can terminate Job Flows from the Management Console or by using the Terminate option from the Amazon EMR CLI.

Resize a running Job Flow: --modify-instance-group, --add-instance-group

When a new Job Flow is created from the Amazon EMR console, there is no way to change it or resize it from the user interface. If the initial Job Flow is too small and is taking too long to complete, the only option from the Management Console is to terminate it and restart the work. You can add task nodes using the --add-instance-group option or additional nodes to any of the group types in the Amazon EMR cluster using the --modify-instance-group option. Technically, the --modify-instance-group option allows an EMR cluster to be increased or decreased in size, but decreasing the number of Core or Master nodes from a running Job Flow can lead to data loss and cause the Job Flow to fail.

Adding JAR steps to Job Flows: --jar, --main-class

These options are used on Job Flow creation, but can also be used to add steps to an already running Job Flow. This can be useful when the --alive option is used on Job Flow creation and additional work needs to be added to an already running Amazon EMR cluster. The ability to add additional steps is not available from the Amazon EMR Management Console. The --jar and --main-class options are used for custom JAR MapReduce applications like the Job Flows demonstrated so far in this book. There are other corresponding command-line options if other Job Flow types are used.

Copying and retrieving files directly: --put, --get

The --put and --get options allow direct interaction with files on the Amazon EMR master node. If an application wants to bypass S3 and place work directly on the cluster, retrieve results, or do any custom functionality on the cluster directly, these options may be useful. In this book, we stick to the out-of-the-box functionality available in S3 and Amazon EMR rather than direct manipulation of the cluster.

TIP

Amazon is trying to combine a number of its AWS command line-utilities under a single AWS command-line interface. This utility will allow you to use a single command-line utility to control many of the services you use at Amazon. Another great benefit of this utility is that it comes preloaded on the EC2 AMI and simply needs to be configured on any running EC2 instances. At the time of writing, the AWS command-line interface was released as a developer preview and was not mature enough for inclusion.

In the process of demonstrating Job Flow creation we will focus on the Amazon EMR CLI --create option. The examples will mimic similar creation and execution processes that were done manually in the Management Console in earlier chapters. Let’s walk through a simple example of scheduling the MapReduce application from this chapter with cron and the Amazon EMR CLI.

To start, you need to create a script to start a new Job Flow. The script will be the input to a Unix cron schedule as shown here:

#!/bin/bash

~/elastic-mapreduce --create \

--name "Filter Example Flow" \

--num-instances 3 \ 1

--instance-type m1.small \ 2

--jar s3n://program-emr/weblog-filter.jar \ 3

--arg com.programemr.weblog_top_ten.WebLogDriver \ 4

--arg s3n://program-emr/NASA_access_log_Jul95 \

--arg s3n://program-emr/run0

1

Specifies the number of EC2 instances to use in an Amazon EMR cluster similar to manual runs in the Amazon EMR Management Console. These are broken out into master and core groups for the cluster.

2

The type of EC2 instances to use for the Job Flow.

3

The custom JAR file to use for the Job Flow.

4

The list of arguments used in our previous examples to specify the main driver class, and the input and output S3 locations.

Let’s run the script manually first and see what happens. You should see output from the script similar to the following:

[ec2-user@ip-10-1-1-1 ~]$ ./ScheduleJobFlow.sh

Created job flow j-18EXVE5FLOWH1

[ec2-user@ip-10-1-1-1 ~]$

The script was saved on the EC2 instance with the name ScheduleFlow.sh.

The Job Flow starts and will appear in the Amazon EMR Management Console just like the Job Flow executions that were done earlier. The Amazon EMR CLI outputs the internal ID of the Job Flow created. We can use this later to review log output in S3 and to terminate the Job Flow using the Amazon EMR CLI Terminate option.

To schedule the Job Flow to run every hour, you can configure cron to execute the script. Cron is a Linux utility that is already part of the EC2 instance and most Unix and Linux systems. This book will not go into all the details of cron, but Linux Desktop Hacks: Tips & Tools for Customizing and Optimizing your OS, by Nicholas Petreley and Jono Bacon (O’Reilly), covers many of the basics for automating scripts.

Following is a simple example of a crontab entry for scheduling the script at the beginning of the hour. This entry can be added to the file /etc/crontab to schedule the script with the cron daemon:

0 * * * * /user/ec2-user/ScheduleJobFlow.sh

The Job Flow will now be started every hour and will run to completion similar to earlier manual executions performed from the Amazon EMR Management Console. We’ve now automated the manual Job Flow creation from earlier with minimal scripting and the use of cron as a scheduler.

Scheduling with AWS Data Pipeline

Automation gets much more complicated in scenarios that involve the coordination of actions on multiple AWS services, S3 object manipulation, and reattempting processes on failures. A more real-world scenario for a Job Flow is outlined in Figure 3-5.

Job Flow automation with multiple services and dependencies

Figure 3-5. Job Flow automation with multiple services and dependencies

Prior to AWS Data Pipeline, accomplishing many of the items in this scenario required you to write numerous scripts, AWS utilities, and additional new applications using the AWS SDK. With the release of AWS Data Pipeline, you can achieve this workflow using this single web service from Amazon.

It is worth noting before you choose to use Data Pipeline that, at the time of writing, the service was currently only available in the US East AWS region. As with most AWS services, Amazon typically releases new features and functionality into the US East region first and rolls out the feature to other AWS regions over time. The Amazon EMR CLI covered earlier can be used to create new Job Flows in any of the AWS regions in which you decide to run your Job Flow. Depending on where you run your MapReduce application, this may limit your ability to use AWS Data Pipeline for scheduling your Job Flows. We see real value and potential with this tool even this early in its product life cycle. AWS Data Pipeline can reduce the operational resources needed to maintain AWS resources by removing much of the additional scripting and applications noted earlier. The potential benefits of this service warrant Data Pipeline’s inclusion as a tool to consider in planning your project.

Creating a Pipeline

Like in all AWS services, the first place to start is the Management Console. Choosing Create Pipeline starts the process of creating a new pipeline. Figure 3-6 shows the initial AWS Data Pipeline screen and the example settings used to start the creation of the pipeline in this section.

AWS Data Pipeline creation settings

Figure 3-6. AWS Data Pipeline creation settings

Let’s review the settings chosen on this initial pipeline setup:

Pipeline name

This is the name that will appear on the Data Pipeline Management Console and is a user-configured value. Choosing a name that represents the purpose of the pipeline is recommended.

Pipeline description

This is the description of what the pipeline will be used for, and can be anything that helps describe the pipeline to others who will need to maintain and troubleshoot it.

Schedule

There are two options: Time Series Style Scheduling and Cron Style Scheduling. Time Series Style Scheduling will schedule pipeline items to run after the specified period of time has elapsed. Suppose an item in the pipeline is scheduled for January 1, 2013, at midnight and it should run every hour. With Time Series Style Scheduling, the first time the pipeline item would execute is January 1, 2013, at 1:00 A.M., or after one hour has passed. With Cron Style Scheduling, a data pipeline item will be scheduled at the beginning of a specified period. Using the same scenario, if an item in the pipeline is scheduled for January 1, 2013, at midnight and should run every hour, the first time the pipeline item would execute is January 1, 2013, at midnight. Cron Style Scheduling was chosen in this walkthrough to mimic the scheduling done earlier using cron with the Amazon EMR CLI.

Role

Role controls permissions and security between other AWS services. The security role will be used for any actions taken by pipeline objects. Role settings become important when you are integrating multiple AWS services running at different levels of permission. For this example, we chose the default role setting.

Adding Data Nodes

Choosing Create Pipeline creates an empty data pipeline that is ready to be set up as a workflow similar to what is described in Figure 3-5. We will develop the workflow using data nodes and activities. Data nodes represent the S3 locations the input file is moved to for processing later by Amazon EMR. Data nodes can also represent other Amazon data storage services like DynamoDB or a MySQL database.

The activities represent the actions that will be performed in the pipeline. Copying the input file between S3 locations and processing the data using Amazon EMR will be translated into activities in the pipeline. To start, let’s create the input file data node in Figure 3-7.

Data Pipeline input file data node

Figure 3-7. Data Pipeline input file data node

In the workflow, in Figure 3-7, we need to specify the input file on S3 and check that the file exists. These are added to the data node via the “Add an optional field” feature. The File Path and Preconditions fields are used for these items. There are numerous other fields that can be added, like Directory Path, which can be useful on data nodes. You can explore and learn more about the Data Pipeline fields and any new additions in the AWS Pipeline Definition Reference. The schedule and preconditions are separate objects that are created in the user interface and can be re-used in other data nodes and activities throughout the pipeline. Figure 3-8 shows the addition of the new fields and the creation of the precondition object.

Data Pipeline creating precondition

Figure 3-8. Data Pipeline creating precondition

The precondition you created verifies that input file exists before proceeding further. This is a useful check so that the later portions of the workflow will not get invoked and use EC2 or Amazon EMR computing hours unless there is work that is ready to be processed. After creating the new precondition, you can select the precondition object in the “Precondition objects” panel for additional configuration. Use the check type of S3KeyExists to verify the NASA logfile exists and is ready to be used in the rest of the pipeline. Figure 3-9 shows the completed precondition object.

Data Pipeline completed precondition object

Figure 3-9. Data Pipeline completed precondition object

The schedule object is set up in a similar fashion to the precondition object and will appear in the Schedules panel. The configuration of a schedule is relatively straightforward and does not require the selection of optional fields. Configuration involves setting a start date and an hourly time period (once an hour) to set up a schedule similar to our earlier command-line example. You could add an optional field, End Date, if you needed to limit a pipeline object to a range of dates.

Every data node and activity has a Schedule field and can have separate and different schedules through the execution of the pipeline. It is easy to imagine a different scenario than what we’ve laid out here. Perhaps your scenario involves input files being copied once an hour and only needing to run the Amazon EMR Job Flow once a day. A requirement like this would necessitate that you create more than one schedule object and set it on activities and data nodes through the pipeline.

The first data node should look similar to Figure 3-10. Throughout the process, you can use the Save Pipeline option to verify there are no missing fields or errors in the pipeline. Any errors or warnings found during saving will appear in the Errors/Warnings panel.

Data Pipeline completed input node

Figure 3-10. Data Pipeline completed input node

In the scheduling scenario, the input file is moved from the location used in earlier examples to a new S3 location for processing. Traditional data processing scenarios move input files to a new location for processing and then archive or delete the processed files. To simulate a similar scenario, let’s create an output data node as the output location for an S3 object copy activity. This location will be specified as the input parameter for the Amazon EMR Job Flow. The output data node is set up with the parameters in Table 3-1 to match the section’s scheduling scenario.

Table 3-1. Output data node scenario settings

Field

Value

Directory Path

s3://program-emr/input

Name

Web log processing location

Type

S3DataNode

Schedule

Schedule created earlier for once an hour

Now you need to create an activity to perform the actual S3 file copy.

Adding Activities

Activities perform actions on the data nodes or other AWS services. They will need to run on EC2 instances or Amazon EMR clusters, and these services can be set up as resources in the Data Pipeline. From the scenario that is being created in data pipeline, there are two activities that will occur. First, the input file will be copied to the processing location and, upon successful completion of the S3 copy, an Amazon EMR Job Flow will pick up the input data and process it.

You can add these two activities using the Add Activity button. Table 3-2 shows the settings used for the S3 copy activity node and the resources used to run it.

Table 3-2. Activity settings for copying input to new S3 location

Field

Value

Runs On

EC2 S3 copy resource

Name

Copy input file to processing location

Type

CopyActivity

Output

Web log processing location

Schedule

Schedule created earlier for once an hour

Input

Web log input file S3 location

The S3 copy activity wires together the input and output Data Nodes we set up earlier. Data Pipeline will draw this relationship in the diagram, similar to what you see in Figure 3-11, indicating the direction in which the S3 object will be copied. The result begins to look like a flowchart.

The Runs On field should be configured to the separate resource object that will be used, and you can reuse this throughout the pipeline. The settings for the EC2 resource on the new activity are shown in Table 3-3.

Table 3-3. Resource settings for the EC2 resource

Field

Value

Instance Type

m1.small

Name

EC2 S3 copy resource

Type

Ec2Resource

Role

DataPipelineDefaultRole

Resource Role

DataPipelineDefaultResourceRole

Schedule

Schedule created earlier for once an hour

Log Uri

s3://program-emr

Terminate After

30 minutes

The Instance Type is set to a minimum setting of m1.small due to the limited resources needed to copy data in S3. This can be increased for special situations to improve performance, but in most scenarios a small EC2 instance to move files in S3 should be sufficient.

The Log Uri writes logs to the S3 bucket location specified. This field and the Terminate After fields were suggested additions made by Data Pipeline, but they are not required. The suggestions appear as warnings when the pipeline is saved. Log Uri aids in troubleshooting if there are issues with the activity, and the Terminate After option allows you to set an upper time limit for the copy operation to complete. This will prevent any runaway EC2 instances due to pipeline failures or S3 issues.

The configuration of the Amazon EMR activity follows a similar pattern, as shown in Table 3-4.

Table 3-4. Activity settings for the Amazon EMR Job Flow

Field

Value

Runs On

Amazon EMR cluster

Name

Amazon EMR web log filter

Type

EmrActivity

Step

s3://program-emr/weblog-filter.jar, \ com.programemr.weblog_top_ten.WebLogDriver, \ s3://program-emr/input/NASA_access_log_Jul95, \ s3://program-emr/run0

Schedule

Schedule created earlier for once an hour

Depends On

Copy input file to processing location

The type EmrActivity chosen for this activity tells Data Pipeline that you intend to use Amazon EMR resources. The parameters on an Amazon EMR activity are slightly different from the EC2 resource. The Step option appears as a field, and the settings provided in this example should look similar to the Create Job Flow options set on previous manual runs of Job Flows. Data Pipeline requires that your custom JAR and parameters be combined into a single Step field, with individual parameters separated by commas.

When we laid out the scheduling scenario earlier, we noted that the EMR resources should not be run if the input file did not exist and if the file was not successfully copied to the new S3 processing location. The Depends On option enforces this check and validates that the file has been copied to the processing location by setting the dependency on the successful completion of the S3 copy activity. You can add multiple Depends On fields if your scenario has several dependencies that need to be met prior to an activity running. The Runs On parameter is similar to the earlier activity and is configured to use the Amazon EMR resource with the settings listed in Table 3-5.

Table 3-5. Resource settings for the Amazon EMR resource

Field

Value

Terminate After

30 minutes

Name

Amazon EMR cluster

Type

EmrCluster

Schedule

Schedule created earlier for once an hour

Core Instance Count

2

Core Instance Type

m1.small

Master Instance Type

m1.small

Log Uri

s3://program-emr

Emr Log Uri

s3://program-emr

The Amazon EMR cluster settings look like what was input into the Amazon EMR Management Console in previous Job Flow runs. Core Instance Count, Core Instance Type, and Master Instance Type set the sizing of the Amazon EMR cluster groups. Log Uri, Emr Log Uri, and Terminate After are not required fields, but will again help in troubleshooting by providing log data from the Job Flow and limiting the execution time of the cluster if there is an issue in pipeline execution.

Scheduling Pipelines

Your pipeline now represents the functionality from the initial scheduling scenario and should look similar to Figure 3-11. It is in a pending state right now and will not run until the Activate option is selected. Once activated, the pipeline will appear as scheduled in the AWS Data Pipeline Management Console.

Fully built Job Flow scheduling Data Pipeline

Figure 3-11. Fully built Job Flow scheduling Data Pipeline

Reviewing Pipeline Status

From the AWS Data Pipeline Management Console, selecting View Instance Details next to your pipeline allows you to determine the success and failure of the many different activities and nodes. In scheduling the pipeline, we intentionally removed the input file from S3. Figure 3-12 shows individual pipeline items waiting for the input file dependency to be met.

Data Pipeline waiting on dependencies

Figure 3-12. Data Pipeline waiting on dependencies

The check for the input file shows as Running, and the details indicate the number of retries AWS Data Pipeline will perform before waiting to try again later. The Data Pipeline Management Console can be used for troubleshooting scenarios like this and to check on successful and waiting pipeline processes.

AWS Pipeline Costs

AWS Data Pipeline is fairly cheap for the functionality it provides. The costs range from free to $2.50 per activity (at the time of the writing of this book). You can find more details on AWS Data Pipeline costs at the AWS Data Pipeline pricing page. Comparing this cost with the cost of running on the smallest and cheapest EC2 micro Linux instance for the entire month with a total monthly cost of $14.64, you can easily see how AWS Data Pipeline can lower operational costs.

Real-World Uses

You have now built a web server processor of the log analysis application described in Chapter 1. Your application can now receive web access log records and review the error requests experienced by users and the frequency of these errors occurring in the log.

The building blocks demonstrated in this chapter have a number of functional uses outside of log analysis. The MapReduce application is performing what is considered both filtering and summarization design patterns by removing unwanted data from the data set and summing up the values of a common key. Other real-world applications of this design technique are:

Data cleansing

Often, data sets contain erroneous information. This can be unrealistic values or too many missing values to be of real use. Performing an initial data-cleansing phase by filtering these values out can lead to better analysis by other applications or other Amazon EMR Job Flows.

Distributed pattern matching

You can look for a specific string or match on a regular expression with a filter pattern. This match can be done in parallel across multiple instances and return matches much quicker than traditional search routines.