Hadoop: The Definitive Guide (2015)
Appendix C. Preparing the NCDC Weather Data
This appendix gives a runthrough of the steps taken to prepare the raw weather datafiles so they are in a form that is amenable to analysis using Hadoop. If you want to get a copy of the data to process using Hadoop, you can do so by following the instructions given at the website that accompanies this book. The rest of this appendix explains how the raw weather datafiles were processed.
The raw data is provided as a collection of tar files, compressed with bzip2. Each year’s worth of readings comes in a separate file. Here’s a partial directory listing of the files:
1901.tar.bz2
1902.tar.bz2
1903.tar.bz2
...
2000.tar.bz2
Each tar file contains a file for each weather station’s readings for the year, compressed with gzip. (The fact that the files in the archive are compressed makes the bzip2 compression on the archive itself redundant.) For example:
% tar jxf 1901.tar.bz2
% ls 1901 | head
029070-99999-1901.gz
029500-99999-1901.gz
029600-99999-1901.gz
029720-99999-1901.gz
029810-99999-1901.gz
227070-99999-1901.gz
Because there are tens of thousands of weather stations, the whole dataset is made up of a large number of relatively small files. It’s generally easier and more efficient to process a smaller number of relatively large files in Hadoop (see Small files and CombineFileInputFormat), so in this case, I concatenated the decompressed files for a whole year into a single file, named by the year. I did this using a MapReduce program, to take advantage of its parallel processing capabilities. Let’s take a closer look at the program.
The program has only a map function. No reduce function is needed because the map does all the file processing in parallel with no combine stage. The processing can be done with a Unix script, so the Streaming interface to MapReduce is appropriate in this case; see Example C-1.
Example C-1. Bash script to process raw NCDC datafiles and store them in HDFS
#!/usr/bin/env bash
# NLineInputFormat gives a single line: key is offset, value is S3 URI
read offset s3file
# Retrieve file from S3 to local disk
echo "reporter:status:Retrieving $s3file" >&2
$HADOOP_HOME/bin/hadoop fs -get $s3file .
# Un-bzip and un-tar the local file
target=`basename $s3file .tar.bz2`
mkdir -p $target
echo "reporter:status:Un-tarring $s3file to $target" >&2
tar jxf `basename $s3file` -C $target
# Un-gzip each station file and concat into one file
echo "reporter:status:Un-gzipping $target" >&2
for file in $target/*/*
do
gunzip -c $file >> $target.all
echo "reporter:status:Processed $file" >&2
done
# Put gzipped version into HDFS
echo "reporter:status:Gzipping $target and putting in HDFS" >&2
gzip -c $target.all | $HADOOP_HOME/bin/hadoop fs -put - gz/$target.gz
The input is a small text file (ncdc_files.txt) listing all the files to be processed (the files start out on S3, so they are referenced using S3 URIs that Hadoop understands). Here is a sample:
s3n://hadoopbook/ncdc/raw/isd-1901.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1902.tar.bz2
...
s3n://hadoopbook/ncdc/raw/isd-2000.tar.bz2
Because the input format is specified to be NLineInputFormat, each mapper receives one line of input, which contains the file it has to process. The processing is explained in the script, but briefly, it unpacks the bzip2 file and then concatenates each station file into a single file for the whole year. Finally, the file is gzipped and copied into HDFS. Note the use of hadoop fs -put - to consume from standard input.
Status messages are echoed to standard error with a reporter:status prefix so that they get interpreted as MapReduce status updates. This tells Hadoop that the script is making progress and is not hanging.
The script to run the Streaming job is as follows:
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-D mapred.reduce.tasks=0 \
-D mapred.map.tasks.speculative.execution=false \
-D mapred.task.timeout=12000000 \
-input ncdc_files.txt \
-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \
-output output \
-mapper load_ncdc_map.sh \
-file load_ncdc_map.sh
I set the number of reduce tasks to zero, since this is a map-only job. I also turned off speculative execution so duplicate tasks wouldn’t write the same files (although the approach discussed in Task side-effect files would have worked, too). The task timeout was set to a high value so that Hadoop doesn’t kill tasks that are taking a long time (for example, when unarchiving files or copying to HDFS, when no progress is reported).
Finally, the files were archived on S3 by copying them from HDFS using distcp.