Organizing and optimizing data in HDFS - Data logistics - Hadoop in Practice, Second Edition (2015)

Hadoop in Practice, Second Edition (2015)

Part 2. Data logistics

Chapter 4. Organizing and optimizing data in HDFS

This chapter covers

· Tips for laying out and organizing your data

· Data access patterns to optimize reading and writing your data

· The importance of compression, and choosing the best codec for your needs

In the previous chapter, we looked at how to work with different file formats in MapReduce and which ones were ideally suited for storing your data. Once you’ve honed in on the data format that you’ll be using, it’s time to start thinking about how you’ll organize your data in HDFS. It’s important that you give yourself enough time early on in the design of your Hadoop system to understand how your data will be accessed so that you can optimize for the more important use cases that you’ll be supporting.

There are numerous factors that will impact your data organization decisions, such as whether you’ll need to provide SQL access to your data (likely, you will), which fields will be used to look up the data, and what access-time SLAs you’ll need to support. At the same time, you need to make sure that you don’t apply unnecessary heap pressure on the HDFS NameNode with a large number of small files, and you also need to learn how to work with huge input datasets.

This chapter is dedicated to looking at ways to efficiently store and access big data in HDFS. I’ll first cover ways you can lay out data in HDFS and present some methods for partitioning and combining data to relieve NameNode heap pressure. Then I’ll discuss some data access patterns to help you work with disparate data as well as huge data sets. And finally, we’ll look at compression as a big data pattern to maximize your storage and processing capabilities.

Chapter prerequisites

This chapter assumes you have a basic understanding of HDFS concepts and that you have experience working directly with HDFS. If you need to become familiar with the topic, Hadoop in Action by Chuck Lam (Manning, 2010) offers the background information you’ll need on HDFS.

We’ll start things off with a look at how you can organize and manage your data.

4.1. Data organization

Data organization is one of the most challenging aspects of working with Hadoop. You have pressure from different groups in your organization, such as data scientists and your cluster administrators, each coming to you with competing requirements. What’s more, these requirements often come after your data applications are in production and you’ve already amassed large amounts of data.

There are multiple dimensions to data organization in Hadoop. You first need to decide how to organize your data in HDFS, after which you’ll be faced with operational issues such as how to partition and compact your data. You’ll need to decide whether to enable Kerberos to secure your cluster and how to manage and communicate data changes. These are all complex issues, and the goal of this chapter is to focus on some of the more challenging aspects of data organization, including data partitioning and compaction, starting off with how you can structure your data in HDFS.

4.1.1. Directory and file layout

Having a cluster-wide standard that defines how data is organized is a worthwhile pursuit, as it makes it easier to discover where data is located, and it also helps apply structure and manage common areas that you want to address with data storage in general. Because we’re working within the confines of what a filesystem can express, a common approach to arranging data is to create a hierarchy of tiers that aligns with your organizational or functional structure. For example, if you work on the analytics team and you’re bringing a new dataset to the cluster, then one way to organize your directory would be as shown in figure 4.1.

Figure 4.1. An example HDFS directory layout

Data revolution

Hopefully you’ve settled on a data format such as Avro, which offers you the ability to evolve your schema over time. That’s great, but how do you support a move to the Next Big Data Format, which will no doubt arrive as soon as everyone has migrated to Avro? Well, you can look to other software fields where semantic versioning concepts permeate interfaces such as URLs and adopt a similar strategy in your directory structure. By sticking a version number in your structure, you can give yourself the flexibility to move to the data format of tomorrow and communicate the differing file formats using the directory path.

Once you’ve embraced putting a version number in your directory, the only challenge left is communicating future changes to the consumers of your data. If this becomes a challenge, you may want to look at HCatalog as a way to abstract away data formats from your clients.

Partitioning by date and other fields

You may need your directory structure to model your organizational and data evolution needs, but why would you need further partitioning by date? This is a technique that Hive used early on to help speed up queries. If you put all of your data into a single directory, you’re essentially doing the Hadoop equivalent of a full table scan every time you need to access the data. Instead, it’s smarter to partition your data based on how you expect your data to be accessed.

It can be hard to know ahead of time exactly how data is going to be accessed, but a reasonable first attempt at partitioning is to segment data by the date when it was generated. If your data doesn’t have a date, then talk to the data producers about adding one, as the time at which an event or record was created is a critical data point that should always be captured.

4.1.2. Data tiers

In his 2012 Strata talk, Eric Sammer presented the idea of storing different tiers of data.[1] This is a powerful concept, and it also ties in nicely with one of the primary tenets of Nathan Marz’s Lambda Architecture—that of never deleting or modifying your raw data.

1 Eric Sammer, “Large scale ETL with Hadoop,” www.slideshare.net/OReillyStrata/large-scale-etl-with-hadoop.

At first glance, this may not seem to make any sense—surely once you extract the important parts of a data source, you can discard the rest! While it may seem wasteful to keep around raw data, especially if there are parts that aren’t being actively used, ask yourself this question—could some organizational value be extracted from the data in the future? It’s hard to answer this with a resounding “no.”

There are also occasionally bugs in our software. Imagine that you’re streaming data from the Twitter fire hose, producing some aggregations and discarding the source data. What happens if you discover a bug in your aggregation logic? You have no way to go back and regenerate the aggregated data.

Therefore, it’s recommended that you think of your data in terms of the following tiers:

· Raw data is the first tier. It’s the unaltered data you capture from the source. Data at this tier should never be modified because there’s a chance that your logic that produces derivatives or aggregations has bugs, and if you discard the raw data, you’ll remove your ability to regenerate your derived data upon discovering your bugs.

· Derived data is created from the raw data. Here you can perform deduplication, sanitation, and any other cleansing.

· Aggregated data is calculated from the derived data and will likely be fed into systems such as HBase or your NoSQL system of choice for real-time access to your data, both in production and for analytical purposes.

Data tiers should also be expressed in your directory layout so that users can easily differentiate between the tiers.

Once you’ve decided on a directory layout for partitioning your data, the next step is figuring out how you’re going to get your data into these partitions. That’s covered next.

4.1.3. Partitioning

Partitioning is the process by which you take a dataset and split it into distinct parts. These parts are the partitions, and they represent a meaningful division of your data. An example of a common partition in data is time, as it allows those querying the data to narrow in on a specific window of time. The previous section included time as a key element in deciding how to lay out your data in HDFS.

Great! You have a large dataset in HDFS and you need to partition it. How do you go about doing that? In this section I’ll present two methods you can employ to partition your data.

Technique 26 Using MultipleOutputs to partition your data

Imagine a situation where you have stock prices being streamed into HDFS, and you want to write a MapReduce job to partition your stock data based on the day of the stock quote. To do this, you’ll need to write to multiple output files in a single task. Let’s look at how you can make that happen.

Problem

You need to partition your data, but most output formats only create a single output file per task.

Solution

Use the MultipleOutputs class bundled with MapReduce.

Discussion

The MultipleOutputs class in Hadoop bypasses the normal channel by which outputs are produced in Hadoop. It provides a separate API to write partitioned outputs, and it writes output directly to the task attempt directory in HDFS. This is powerful, as you can continue to collect output using the standard write method on the Context object supplied to your job, and also use MultipleOutputs to write partitioned output. Of course, you can also choose to only use the MultipleOutputs class and ignore the standard Context-based output.

In this technique, you’ll use MultipleOutputs to partition stocks by their quote date. The first step is to set up MultipleOutputs for use in your job. In your driver, you’ll indicate the output format and the key and value types:

Why do you need to name the output in the driver?

You may be wondering why MultipleOutputs requires you to specify a output name (partition in the preceding example). This is because MultipleOutputs supports two modes of operation—static partitions and dynamic partitions.

Static partitions work well if you know ahead of time the partition names; this gives you the additional flexibility of specifying a different output format for each partition (you’d just have multiple calls to MultipleOutputs.addNamedOutput with different named outputs). With static partitions, the output name you specify when calling addNamedOutput is the same name that you use when emitting output in your mapper or reducer.

This technique focuses on dynamic partitions, which you’re likely to find more useful, because in most cases you won’t know the partitions ahead of time. In this case, you still need to supply a output name, but for all intents and purposes, it’s ignored, as you can dynamically specify the partition name in your mapper or reducer.

As you can see in the following code, your map (or reduce) class, will get a handle to a MultipleOutputs instance and then use its write method to write partitioned outputs. Notice that the third argument is the partition name, which is the stock date:[2]

2 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/MultipleOutputsJob.java.

Don’t forget the close method!

It’s important that you call the close method on MultipleOutputs in the cleanup method of your task. Otherwise it’s possible that you’ll have data missing from your output or even a corrupt file.

Let’s take a peek at this class in action. As you can see in the following output, running the previous example produces a number of partitioned files for the single mapper. You can also see the original map output file, which is empty because you haven’t emitted any records using theContext object:

$ hip hip.ch4.MultipleOutputsJob --input stocks.txt --output out1

$ hadoop fs -ls -R out1

out1/2000-01-03-m-00000

out1/2001-01-02-m-00000

out1/2002-01-02-m-00000

out1/2003-01-02-m-00000

out1/2004-01-02-m-00000

out1/2005-01-03-m-00000

out1/2006-01-03-m-00000

out1/2007-01-03-m-00000

out1/2008-01-02-m-00000

out1/2009-01-02-m-00000

out1/_SUCCESS

out1/part-m-00000

In this example you used a map-only job, but in production you’ll probably want to limit the number of tasks that create partitions. There are two ways you can do this:

· Use the CombineFileInputFormat or a custom input format to limit the number of mappers in your job.

· Use a reducer where you can explicitly specify a reasonable number of reducers.

Summary

There are plenty of things to like about MultipleOutputs: its support for both “old” and “new” MapReduce APIs and its support for multiple output format classes. But using MultipleOutputs does carry with it some constraints that you should be aware of:

· Be cautious when using MultipleOutputs in a mapper—remember that you’ll end up with NumberOfMappers * NumberOfPartition output files, which in my experience can bring down clusters with large numbers of both values!

· Each partition incurs the overhead of an HDFS file handle for the duration of the task.

· You can often end up with a large number of small files that accumulate across multiple uses of your partitioner. You’ll probably want to make sure that you have a compaction strategy in place to mitigate this problem (see section 4.1.4 for more details).

· Although Avro comes with the AvroMultipleOutputs class, it’s quite slow due to some inefficiencies in the code.

In addition to the MultipleOutputs approach, Hadoop also comes with a MultipleOutputFormat class that has features similar to MultipleOutputs. Its primary pitfalls are that it only supports the old MapReduce API and only one output format can be used for all the partitions.

Another partitioning strategy that you can employ is to use the MapReduce partitioner, which can help mitigate the large number of files that may be produced using MultipleOutputs.

Technique 27 Using a custom MapReduce partitioner

Another partitioning approach is to use the partitioning facilities built into Map-Reduce. By default, MapReduce uses a hash partitioner that calculates the hash of each map output key and performs a modulo over the number of reducers to determine which reducer the record should be sent to. You can control how partitioning occurs by writing your own custom partitioner and then route records according to your partitioning scheme.

This technique has an added benefit over the previous technique in that you’ll generally end up with fewer output files because each reducer will only create a single output file, as opposed to MultipleOutputs, where each map or reduce task will generate N output files—one for each partition.

Problem

You want to partition your input data.

Solution

Write a custom partitioner that partitions records to the appropriate reducer.

Discussion

Let’s look at the custom partitioner first. It exposes a helper method to the Map-Reduce driver that allows you to define a mapping from a date to a partition, and it writes this mapping to the job configuration. Then, when MapReduce loads the partitioner, MapReduce calls the setConfmethod; in this partitioner you’ll read the mappings into a map, which is subsequently used when partitioning.[3]

3 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/CustomPartitionerJob.java.

Your driver code needs to set up the custom partitioner configuration. The partitions in this example are dates, and you want to make sure that each reducer will correspond to a unique date. The stocks example data has 10 unique dates, so you configure your job with 10 reducers. You also call the partition helper function that was defined previously to set up the configuration that maps each unique date to a unique reducer.[4]

4 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/CustomPartitionerJob.java.

The mapper does little other than extract the stock date from the input data and emit it as the output key:[5]

5 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/CustomPartitionerJob.java.

The command to run the preceding example is as follows:

$ hip hip.ch4.CustomPartitionerJob --input stocks.txt --output output

This job will generate 10 output files, each containing the stocks for that day.

Summary

Using the MapReduce framework to naturally partition your data gives you a couple of advantages:

· Data in your partitions will be sorted because the shuffle will ensure that all data streamed to a reducer will be sorted. This allows you to use optimized join strategies on your data.

· You can deduplicate data in the reducer, again as a benefit of the shuffle phase.

The main problem to look out for with this technique is data skew. You want to make sure that you can spread the load across reducers as much as possible, which may be a challenge if there’s a natural skew in your data. For example, if your partitions are days, then it’s possible that the majority of your records will be for a single day, and you may have only a few records for either a previous or following day. In this case, you’ll ideally want to partition records in a way that allocates the majority of the reducers to a single day, and then maybe one or two for the previous or following days. You can also sample your inputs and dynamically determine the optimal number of reducers based on your sample data.

Once you’ve produced your partitioned output, the next challenge is how to deal with the potentially large number of small files that have resulted from the partitioning.

4.1.4. Compacting

Sometimes having small files in HDFS can’t be avoided—maybe you’re using a partitioning technique similar to those described previously, or maybe your data organically lands in HDFS in small file sizes. Either way, you’ll be exposing some weaknesses in HDFS and MapReduce, including the following:

· Hadoop’s NameNode keeps all the HDFS metadata in memory for fast metadata operations. Yahoo! estimated that each file, on average, occupies 600 bytes of space in memory,[6] which translates to a metadata overhead of one billion files amounting to 60 GB, all of which needs to be stored in the NameNode’s memory. That’s a lot of memory for a single process, even with today’s mid-tier server RAM capacities.

6 According to Yahoo! statistics, each block or file inode uses less than 200 bytes of memory, and on average each file occupies 1.5 blocks with a 3x replication factor. See Yahoo!’s page titled “Scalability of the Hadoop Distributed File System,” http://developer.yahoo.com/blogs/hadoop/posts/2010/05/scalability_of_the_hadoop_dist/ and a JIRA ticket titled “Name-node memory size estimates and optimization proposal,” https://issues.apache.org/jira/browse/HADOOP-1687.

· If your input to a MapReduce job is a large number of files, the number of mappers that will run (assuming your files are text or splittable) would be equivalent to the number of blocks that these files occupy. If you run a MapReduce job whose input is thousands or millions of files, your job will spend more time at the kernel layer dealing with creating and destroying your map task processes than it will on its work.

· Finally, if you’re running in a controlled environment where there’s a scheduler, you may have a cap on the number of tasks your MapReduce job can use. Because each file (by default) results in at least one map task, this could cause your job to be rejected by the scheduler.

If you’re thinking you won’t have this problem, think again. What percentage of your files are smaller than the HDFS block size?[7] And how much smaller are they—50%, 70%, 90%? What if your big data project takes off and suddenly you need to be able to scale to handle datasets that are several orders of magnitude greater in size? Isn’t that why you use Hadoop in the first place? To scale, you want to be able to add more nodes and then get back to your morning coffee. You don’t want to have to go back and redesign your use of Hadoop and deal with migrating your files. Thinking and preparing for this eventuality is best done early in your design phase.

7 The default block size is 1,238 MB. Check the value of dfs.block.size to see what it’s set to in your cluster.

This section examines some techniques that you can use to combine your data in HDFS. I’ll start off by discussing a utility called filecrush, which can compact small files together to create a smaller number of larger files. I’ll also show you how Avro can be used as a container format to store files that can’t be easily compacted, such as binary files.

Technique 28 Using filecrush to compact data

Compacting is the act of combining small files together to produce larger files—this helps alleviate heap pressure on the NameNode. In this technique, you’ll learn about an open source utility you can use to compact data and help keep your cluster administrator happy.

Compatibility with Hadoop versions

Currently the filecrush utility only works with Hadoop version 1. I’m writing a simple file compacter that’s compatible with Hadoop 2 at https://github.com/alexholmes/hdfscompact.

Problem

You want to combine small files to reduce the metadata that the NameNode needs to keep in memory.

Solution

Use the filecrush utility.

Discussion

The filecrush utility[8] combines or compacts multiple small files to form larger files. The utility is quite sophisticated and gives you the ability to

8 The filecrush GitHub project page is located at https://github.com/edwardcapriolo/filecrush.

· Determine the size threshold below which files will be compacted (and by association, leave files that are large enough alone)

· Specify the maximum size of the compacted files

· Work with different input and output formats and different input and output compression codecs (useful for moving to a different file format or compression codec)

· Swap smaller files with newer compacted files in place

We’ll use filecrush on a straightforward example—we’ll crush a single directory of small text files and replace them with gzipped SequenceFiles.

First, artificially create 10 input files in a directory in HDFS:

$ hadoop fs -mkdir crusher-dir

$ for i in `seq 1 10`; do

hadoop fs -put test-data/stocks.txt crusher-dir/stocks$i.txt

done

Now run filecrush. In this example, you’ll replace the small files with the new large file, and also convert from a text file to a compressed SequenceFile:

After running filecrush, you’ll observe that the files in the input directory have been replaced by a single SequenceFile:

$ hadoop fs -ls -R crusher-dir

crusher-dir/crushed_file-20140713162739-0-0

You can also run the text Hadoop command to view the text representation of the SequenceFile:

$ hadoop fs -text crusher-dir/crushed_file-20140713162739-0-0

You’ll also notice that the original small files have all been moved to the output directory that you specified in your command:

$ hadoop fs -ls -R crusher-out

crusher-out/user/aholmes/crusher-dir/stocks1.txt

crusher-out/user/aholmes/crusher-dir/stocks10.txt

crusher-out/user/aholmes/crusher-dir/stocks2.txt

...

If you had run filecrush without the --clone option, the input files would have remained intact, and the crushed file would have been written to the output directory.

Input and output file size thresholds

How does filecrush determine whether files need to be crushed? It looks at each file in the input directory and compares it to the block size (or in Hadoop 2, the size that you specified in -Ddfs.block.size in the command). If the file is less than 75% of the block size, it will be crushed. This threshold can be customized by supplying the --threshold argument—for example, if you wanted to raise the value to 85%, you’d specify --threshold 0.85.

Similarly, filecrush uses the block size to determine the output file sizes. By default, it won’t create output files that occupy more than eight blocks, but this can be customized with the --max-file-blocks argument.

Summary

Filecrush is a simple and quick way to combine small files together. It supports any type of input or output files as long as there are associated input format and output format classes. Unfortunately, it doesn’t work with Hadoop 2, and there hasn’t been much activity in the project over the last few years, so these points may rule out this utility for your environment.

The example presented in this technique works well in situations where the directory being crushed is an external Hive table, or if you’re running it against a directory in a standard location where other users in a cluster expect your data to exist.

Currently, the filecrush project doesn’t work with Hadoop 2. If you’re looking for a solution for Hadoop 2, take a look at another HDFS compactor that I’m currently working on at https://github.com/alexholmes/hdfscompact.

Because filecrush requires input and output formats, one use case where it falls short is if you’re working with binary data and you need a way to combine small binary files together.

Technique 29 Using Avro to store multiple small binary files

Let’s say that you’re working on a project akin to Google Images, where you crawl the web and download image files from websites. Your project is internet-scale, so you’re downloading millions of files and storing them individually in HDFS. You already know that HDFS doesn’t work well with a large number of small files, but you’re dealing with binary data, so the previous technique doesn’t fit your needs.

This technique shows how you can use Avro as a container file format for binary data in HDFS.

Problem

You want to store a large number of binary files in HDFS, and to do so without hitting the NameNode memory limits.

Solution

The easiest way to work with small binary files in HDFS is to package them into a larger containing file. For this technique, you’ll read all of the files in a directory stored on local disk and save them in a single Avro file in HDFS. You’ll also see how to use the Avro file in MapReduce to process the contents of the original files.

Discussion

Figure 4.2 shows the first part of this technique, where you create the Avro file in HDFS. In doing so, you create fewer files in HDFS, which means less data to be stored in NameNode memory, which also means you can store more stuff.

Figure 4.2. Storing small files in Avro allows you to store more.

Avro is a data serialization and RPC library invented by Doug Cutting, the creator of Hadoop. Avro has strong schema-evolution capabilities that give it an advantage over competitors such as SequenceFile. Avro and its competitors were covered extensively in chapter 3.

Take a look at the Java code in the following listing, which will create the Avro file.[9]

9 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/SmallFilesWrite.java.

Listing 4.1. Read a directory containing small files and produce a single Avro file in HDFS

Compression dependency

To run the code in this chapter, you’ll need to have both the Snappy and LZOP compression codecs installed on your host. Please refer to the appendix for details on how to install and configure them.

Let’s see what happens when you run this script against Hadoop’s config directory (replace $HADOOP_CONF_DIR with the directory containing your Hadoop configuration files):

$ hip hip.ch4.SmallFilesWrite $HADOOP_CONF_DIR test.avro

/etc/hadoop/conf/ssl-server.xml.example: cb6f1b218...

/etc/hadoop/conf/log4j.properties: 6920ca49b9790cb...

/etc/hadoop/conf/fair-scheduler.xml: b3e5f2bbb1d6c...

...

Looks promising—let’s make sure that the output file is in HDFS:

$ hadoop fs -ls test.avro

2011-08-20 12:38 /user/aholmes/test.avro

To be sure everything’s working as expected, you can also write some code that will read the Avro file from HDFS and output the MD5 hash for each file’s content:[10]

10 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/SmallFilesRead.java.

This code is simpler than the write. Because Avro writes the schema into every Avro file, you don’t need to give Avro any information about the schema as part of deserialization. Give the code a spin:

At this point you have Avro files in HDFS. Even though this chapter is about HDFS, the next thing you’ll likely want to do is process the files that you wrote in MapReduce. Let’s look at how to do that, writing a map-only MapReduce job that can read the Avro records as input and write out a text file containing the filenames and MD5 hashes of the file contents, as shown in figure 4.3.

Figure 4.3. Map job to read Avro files and write out a text file

The next listing shows the code for this MapReduce job.[11]

11 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/SmallFilesMapReduce.java.

Listing 4.2. A MapReduce job that takes as input Avro files containing the small files

If you run this MapReduce job over the Avro file you created earlier, the job log files will contain your filenames and hashes:

In this technique, it was assumed that you were working with a file format (such as image files) that couldn’t have separate files concatenated together. If your files can be concatenated, you should consider that option. If you go this route, try your best to make sure that the file size is at least as large as the HDFS block size to minimize the data stored in NameNode.

Summary

You could have used Hadoop’s SequenceFile as a mechanism to hold your small files. SequenceFile is a more mature technology, having been around longer than Avro files. But SequenceFiles are Java-specific, and they don’t provide the rich interoperability and versioning semantics you get with Avro.

Google’s Protocol Buffers, as well as Apache Thrift (which originated from Facebook), can also be used to store small files. But neither has a input format that works with native Thrift or Protocol Buffers files.

Another approach you could use is to write the files into a zip file. The downsides to this approach are first that you’d have to write a custom input format[12] to process the zip file, and second that zip files aren’t splittable (as opposed to Avro files and SequenceFiles). This could be mitigated by generating multiple zip files and attempting to make them close to the HDFS block size.

12 There has been a ticket open since 2008 asking for a zip input format implementation; see https://issues.apache.org/jira/browse/MAPREDUCE-210.

Hadoop also has a CombineFileInputFormat that can feed multiple input splits (across multiple files) into a single map task, which greatly decreases the number of map tasks needed to run.

You also could have created a tarball file containing all the files, and then produced a separate text file that contained the locations of the tarball file in HDFS. This text file would be supplied as the input to the MapReduce job, and the mapper would open the tarball directly. But that approach would circumvent the locality in Map-Reduce, because the mappers would be scheduled to execute on the node that contained the text file, and would therefore likely need to read the tarball blocks from remote HDFS nodes, incurring unnecessary network I/O.

Hadoop Archive files (HARs) are Hadoop files specifically created to solve the problem of small files. They are a virtual filesystem that sits on top of HDFS. The disadvantages of HAR files are that they can’t be optimized for local disk access in Map-Reduce, and they can’t be compressed.

Hadoop version 2 supports HDFS Federation, where HDFS is partitioned into multiple distinct namespaces, with each independently managed by a separate NameNode. This, in effect, means that the overall impact of keeping block information in memory can be spread across multiple NameNodes, thereby supporting a much larger number of small files. Hortonworks has a good blog post that contains more details about HDFS Federation (“An Introduction to HDFS Federation” [August 23, 2011], http://hortonworks.com/an-introduction-to-hdfs-federation/).

Finally, MapR, which provides a Hadoop distribution, has its own distributed filesystem that supports large numbers of small files. Using MapR for your distributed storage is a big change to your system, so it’s unlikely you’ll move to MapR to mitigate this problem with HDFS.

You may encounter times when you’ll want to work with small files in Hadoop, and using them directly would result in bloated NameNode memory use and MapReduce jobs that run slowly. This technique helps you mitigate these issues by packaging small files into larger container files. I picked Avro for this technique because of its support for splittable files and compression and its expressive schema language, which will help with versioning.

What if you have the opposite problem, where your files are big and you want to be more efficient about how you store your data? Our coverage of compression in Hadoop (section 4.2) will come to your rescue in these situations. But before we get to that section, let’s continue with our look at data organization and discover some tips on how to move data atomically in HDFS.

4.1.5. Atomic data movement

Activities such as partitioning and compacting tend to follow a similar pattern—they produce output files in a staging directory, and then need to atomically move them to their final destination once all the output files have been successfully staged. This may bring up some questions:

· What trigger do you use to determine that you’re ready to perform the atomic move?

· How do you move data atomically in HDFS?

· What impact does your data movement have on any readers of the final data?

It may be tempting to perform the atomic move as a postprocessing step within your MapReduce driver, but what will happen if the client process dies before the Map-Reduce application completes? This is where using the OutputCommitter in Hadoop is useful, because you can perform any atomic file movement as part of your job, as opposed to using the driver. An example of the OutputCommitter is shown in section 3.5.2.

The next question is how you can move data atomically in HDFS. For the longest time, it was thought that the rename method on the DistributedFileSystem class (which is the concrete implementation supporting HDFS) was atomic. But it turns out that there are situations where this isn’t an atomic operation. This was remedied in HADOOP-6240, but for backward compatibility reasons, the rename method wasn’t updated. As a result, the rename method is still not truly atomic; instead, you need to use a new API. As you can see, the code is cumbersome and it only works with newer versions of Hadoop:

DistributedFileSystem fs = (DistributedFileSystem) FileSystem.get(new Configuration());

fs.rename(src, dest, Options.Rename.NONE);

One thing that’s missing from HDFS is the ability to atomically swap directories. This would be useful in situations such as compacting, where you need to replace the entire contents of a directory that is being used by other processes such as Hive. There’s an open JIRA ticket titled “Atomic Directory Swapping Operation” (https://issues.apache.org/jira/browse/HDFS-5902) that will hopefully provide this ability in the future.

It’s important that you factor the points discussed here into the design of your system. And if you’re using a third-party utility or library, try to determine whether it’s atomically moving data.

This concludes our look at data organization techniques. Let’s switch to another important data management topic in Hadoop, that of data compression.

4.2. Efficient storage with compression

Data compression is a mechanism that reduces data to a more compact form to save on storage space and to make it more efficient to transfer the data. Compression is an important aspect of dealing with files, and it becomes all the more important when dealing with the data sizes that Hadoop supports. Your goal with Hadoop is to be as efficient as possible when working with your data, and picking a suitable compression codec will result in your jobs running faster and allow you to store more data in your cluster.[13]

13 A compression codec is a programming implementation capable of reading and writing a given compression format.

Technique 30 Picking the right compression codec for your data

Using compression with HDFS isn’t as transparent as it is on filesystems such as ZFS,[14] especially when dealing with compressed files that can be split (more on that later in this chapter). One of the advantages of working with file formats such as Avro and SequenceFile is their built-in compression support, making compression almost completely transparent to users. But you lose that luxury when working with file formats such as text.

14 ZFS, short for Z File System, is a filesystem developed by Sun Microsystems that provides innovative features to enhance data integrity.

Problem

You want to evaluate and determine the optimal compression codec for use with your data.

Solution

Snappy, a compression codec from Google, offers the best combination of compressed size and read/write execution times. But LZOP is the best codec when working with large compressed files that must support splittability.

Discussion

Let’s kick things off with a quick look at the compression codecs available for use in Hadoop, shown in table 4.1.

Table 4.1. Compression codecs

Codec

Background

Deflate

Deflate is similar to zlib, which is the same compression algorithm that gzip uses but without the gzip headers.

gzip

The gzip file format consists of a header and a body, which contains a Deflate-compressed payload.

bzip2

bzip2 is a space-efficient compression codec.

LZO

LZO is a block-based compression algorithm that allows the compressed data to be split.

LZOP

LZOP is LZO with additional headers. At one time, LZO/LZOP came bundled with Hadoop, but they have since been removed due to GPL licensing restrictions.

LZ4

LZ4 is a speedy derivative of the same compression algorithm on which LZO is based.

Snappy

Snappy (http://code.google.com/p/hadoop-snappy/) is a recent addition to the codec options in Hadoop. It’s Google’s open source compression algorithm. Google uses it for compressing data in both MapReduce and BigTable.[a] Snappy’s main drawback is that it’s not splittable. If you’re working with file formats that support splitting, such as Avro or Parquet, or your file sizes are smaller than or equal to your HDFS block size, you can ignore this drawback.

a BigTable is Google’s proprietary database system; see Fay Chang et al., “Bigtable: A Distributed Storage System for Structured Data,” http://research.google.com/archive/bigtable.html.

To properly evaluate the codecs, you first need to specify your evaluation criteria, which should be based on functional and performance traits. For compression, your criteria are likely to include the following:

· Space/time trade-off —Generally, the more computationally expensive compression codecs yield better compression ratios, resulting in smaller compressed outputs.

· Splittability —Can a compressed file be split for use by multiple mappers? If a compressed file can’t be split, only a single mapper will be able to work on it. If that file spans multiple blocks, you’ll lose out on data locality because the map will likely have to read blocks from remote DataNodes, incurring the overhead of network I/O.

· Native compression support —Is there a native library that performs compression and decompression? This will usually outperform a compression codec written in Java with no underlying native library support.

Table 4.2 compares the compression codecs currently available (we’ll cover the space/time comparison in the next section).

Table 4.2. Comparison of compression codecs

Codec

Extension

Licensing

Splittable

Java-only compression support

Native compression support

Deflate

.deflate

zlib

No

Yes

Yes

gzip

.gz

GNU GPL

No

Yes

Yes

bzip2

.gz

BSD

Yes [a]

Yes

Yes[b]

LZO

.lzo_deflate

GNU GPL

No

No

Yes

LZOP

.lzo

GNU GPL

Yes [c]

No

Yes

LZ4

.lz4

New BSD

No

No

Yes

Snappy

.gz

New BSD

No

No

Yes

a The Java version of bzip2 is splittable in Hadoop 2 and 1.1.0 onward (see https://issues.apache.org/jira/browse/HADOOP-4012). The native version isn’t currently splittable.

b Native bzip2 support was added in Hadoop 2.1 (see https://issues.apache.org/jira/browse/HADOOP-8462).

c LZOP files aren’t natively splittable. You need to preprocess them to create an index file, which is then used by their respective CompressionCodec implementations to determine the file splits. We’ll cover how you can achieve this in technique 32.

Native vs. Java bzip2

Native support for bzip2 was recently added to Hadoop (starting from versions 2.0 and 1.1.0). Native bzip2 support is the default, but it doesn’t support splittability. If you need splittability with bzip2, you’ll need to enable the Java bzip2, which can be specified by settingio.compression.codec.bzip2.library to java-builtin.

Now that you understand the codecs, how do they square up when looking at their space/time trade-offs? I used a 100 MB (10^8) Wikipedia XML file (enwik8.zip from http://mattmahoney.net/dc/textdata.html), to compare the codec run times and their compression sizes. The results of these tests can be seen in table 4.3.

Table 4.3. Performance comparison of compression codecs on a 100 MB text file

Codec

Compression time (secs)

Decompression time (secs)

Compressed file size

Compressed percentage

Deflate

9.21

1.02

36,548,921

36.55%

gzip

9.09

0.90

36,548,933

36.55%

bzip2 (Java)

47.33

6.45

29,007,274

29.01%

bzip2 (native)

11.59

4.66

29,008,758

29.01%

LZO

2.10

0.46

53,926,001

53.93%

LZOP

2.09

0.45

53,926,043

53.93%

LZ4

1.72

0.28

57,337,587

57.34%

Snappy

1.75

0.29

58,493,673

58.49%

Running your own tests

When you’re performing your own evaluation, I recommend you perform your tests using your own data, and preferably on hosts similar to your production nodes. This way you’ll have a good sense of the expected compression and run times for the codecs.

Also make sure your cluster has native codecs enabled. You can check this by running the following command:

$ hadoop checknative -a

Figure 4.4 shows the compressed sizes in bar graph form.

Figure 4.4. Compressed file sizes for a single 100 MB text file (smaller values are better)

Figure 4.5 shows the compressed times in bar graph form. These times will vary significantly based on hardware, and they’re only supplied here to give a sense of how they relate to each other.

Figure 4.5. Compression and decompression times for a single 100 MB text file (smaller values are better)

What do the space and time results tell you? If squeezing as much data into your cluster as possible is your top priority and you can live with long compression times, then bzip2 may be the right codec for you. If you want to compress your data but introduce the least amount of CPU overhead when it comes to reading and writing compressed files, you should look at LZ4. Anyone looking for a balance between compression and execution times would have to eliminate the Java version of bzip2 from the picture.

Being able to split your compressed files is important, and here you have to choose between bzip2 and LZOP. The native bzip2 codec doesn’t support splitting, and the Java bzip2 times will likely give most people pause. The only advantage of bzip2 over LZOP is that its Hadoop integration is much easier to work with than LZOP’s. Although LZOP is the natural winner here, it requires some effort to work with, as you’ll see in technique 32.

Summary

The best codec for you will depend on your criteria. LZ4 is the most promising codec if you don’t care about splitting your files, and LZOP is what you should be looking at if you want splittable files.

Another factor to consider is the long-term storage retention required for the data. If you’re keeping data for a long time, you’ll probably want to maximize the compression of your files, for which I would recommend a zlib-based codec (such as gzip). Because gzip isn’t splittable, though, it would be prudent to use it in combination with a block-based file format such as Avro or Parquet so that your data can still be split. Or you could size your outputs so they occupy a single block in HDFS so that splittability isn’t a concern.

Bear in mind that compressed sizes will vary based on whether your file is text or binary and depending on its contents. To get accurate numbers, you should run similar tests against your own data.

Compressing data in HDFS has many benefits, including reduced file sizes and faster MapReduce job runtimes. A number of compression codecs are available for use in Hadoop, and I evaluated them based on features and performance. Now you’re ready to start using compression. Let’s look at how you can compress files and use them with tools such as MapReduce, Pig, and Hive.

Technique 31 Compression with HDFS, MapReduce, Pig, and Hive

Because HDFS doesn’t provide built-in support for compression, it can be a challenge to work with compression in Hadoop. The onus falls on you to figure out how to work with compressed files. Also, splittable compression isn’t for the faint of heart, because it doesn’t come out of the box with Hadoop.[15] If you’re dealing with medium-size files that compress down to near-HDFS block size, this technique will be the quickest and simplest way to reap the benefits of compression in Hadoop.

15 Technically, you can get out-of-the-box splittable compression with bzip2, but its performance traits, as shown earlier in this section, rule it out as a serious compression codec.

Problem

You want to read and write compressed files in HDFS and also use them with Map-Reduce, Pig, and Hive.

Solution

Working with compressed files in MapReduce involves updating the MapReduce configuration file mapred-site.xml and registering the compression codec you’re using. After you do this, working with compressed input files in MapReduce requires no additional steps, and producing compressed MapReduce output is a matter of setting the mapred.output.compress and mapred.output.compression.codec MapReduce properties.

Discussion

The first step is to figure out how to read and write files using any of the codecs evaluated earlier in this chapter. All of the codecs detailed in this chapter are bundled with Hadoop except for LZO/LZOP and Snappy, so if you want to work with those three, you’ll need to download and build them yourself (I’ll walk you through how to work with LZO/LZOP later in this section).

To use the compression codecs, you need to know their class names, which are listed in table 4.4.

Table 4.4. Codec classes

Codec

Class

Default extension

Deflate

org.apache.hadoop.io.compress.DeflateCodec

deflate

gzip

org.apache.hadoop.io.compress.GzipCodec

gz

bzip2

org.apache.hadoop.io.compress.BZip2Codec

bz2

LZO

com.hadoop.compression.lzo.LzoCodec

lzo_deflate

LZOP

com.hadoop.compression.lzo.LzopCodec

lzo

LZ4

org.apache.hadoop.io.compress.Lz4Codec

lz4

Snappy

org.apache.hadoop.io.compress.SnappyCodec

snappy

Using compression in HDFS

How would you compress an existing file in HDFS using any one of the codecs mentioned in the previous table? The following code supports doing that:[16]

16 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/CompressedFileWrite.java.

Codec caching

One of the overheads to using compression codecs is that they can be expensive to create. When you use the Hadoop ReflectionUtils class, some of the reflection overhead associated with creating the instance will be cached in ReflectionUtils, which should speed up subsequent creation of the codec. A better option would be to use the CompressionCodecFactory, which provides caching of the codecs themselves.

Reading this compressed file is as simple as writing it:[17]

17 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/CompressedFileRead.java.

Super simple. Now that you can create compressed files, let’s look at how you can work with them in MapReduce.

Using compression in MapReduce

To work with compressed files in MapReduce, you need to set some configuration options for your job. For the sake of brevity, let’s assume identity mappers and reducers[18] in this example:[19]

18 An identity task is one that emits all of the input it receives as output, without any transformation or filtering.

19 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/CompressedMapReduce.java.

The only differences between a MapReduce job that works with uncompressed versus compressed I/O are the three annotated lines in the previous example.

Not only can a job’s input and output be compressed, but so can the intermediate map output, because it’s spilled first to disk, and then eventually over the network to the reducer. The effectiveness of compressing the map output will ultimately depend on the type of data being emitted, but as a general rule, you should see some job speed-up by making this change.

Why didn’t you have to specify the compression codec for the input file in the preceding code? By default the FileInputFormat class uses the CompressionCodecFactory to determine if the input file extension matches any of the registered codecs. If it finds a codec that’s associated with that file extension, it automatically uses that codec to decompress the input files.

How does MapReduce know which codecs to use? You need to specify the codecs in mapred-site.xml. The following code shows how to register all of the codecs we’ve evaluated. Remember that other than gzip, Deflate, and bzip2, all compression codecs need to be built and made available on your cluster before you can register them:

<property>

<name>io.compression.codecs</name>

<value>

org.apache.hadoop.io.compress.GzipCodec,

org.apache.hadoop.io.compress.DefaultCodec,

org.apache.hadoop.io.compress.BZip2Codec,

com.hadoop.compression.lzo.LzoCodec,

com.hadoop.compression.lzo.LzopCodec,

org.apache.hadoop.io.compress.SnappyCodec

</value>

</property>

<property>

<name>

io.compression.codec.lzo.class

</name>

<value>

com.hadoop.compression.lzo.LzoCodec

</value>

</property>

Now that you’ve mastered compression with MapReduce, it’s time to look higher up the Hadoop stack. Because compression can also be used in conjunction with Pig and Hive, let’s see how you can mirror your MapReduce compression accomplishment using Pig and Hive. (As I’ll show inchapter 9, Hive is a higher-level language that abstracts away some of the complex details of MapReduce.)

Using compression in Pig

If you’re working with Pig, there’s no extra effort required to use compressed input files. All you need to do is ensure your filename extension maps to the appropriate compression codec (see table 4.4). The following example gzips a local password file, loads it into Pig, and dumps out the usernames:

Writing out a gzipped file is the same—make sure you specify the extension for a compression codec. The following example stores the results of Pig relation B in a file in HDFS, and then copies them to the local filesystem to examine the contents:

grunt> STORE B INTO 'passwd-users.gz';

# Ctrl+C to break out of Pig shell

$ hadoop fs -get passwd-users.gz/part-m-00000.gz .

$ gunzip -c part-m-00000.gz

root

bin

daemon

...

That was straightforward—let’s hope things are equally smooth with Hive.

Using compression in Hive

As with Pig, all you need to do is specify the codec extension when defining the filename:

The previous example loaded a gzipped file into Hive. In this situation, Hive moves the file being loaded into Hive’s warehouse directory and continues to use the raw file as its storage for the table.

What if you want to create another table and also specify that it should be compressed? The following example achieves this by setting some Hive configs to enable MapReduce compression (because a MapReduce job will be executed to load the new table in the last statement):

hive> SET hive.exec.compress.output=true;

hive> SET hive.exec.compress.intermediate = true;

hive> SET mapred.output.compression.codec =

org.apache.hadoop.io.compress.GzipCodec;

hive> CREATE TABLE apachelog_backup (...);

hive> INSERT OVERWRITE TABLE apachelog_backup SELECT * FROM apachelog;

You can verify that Hive is indeed compressing the storage for the new apachelog_backup table by looking at it in HDFS:

$ hadoop fs -ls /user/hive/warehouse/apachelog_backup

/user/hive/warehouse/apachelog_backup/000000_0.gz

It should be noted that Hive recommends using SequenceFile as the output format for tables because SequenceFile blocks can be individually compressed.

Summary

This technique provides a quick and easy way to get compression running in Hadoop. It works well for files that aren’t too large because it offers a fairly transparent way of working with compression.

If your compressed file sizes are much larger than the HDFS block size, read on for compression techniques that can split your files.

Technique 32 Splittable LZOP with MapReduce, Hive, and Pig

Imagine that you’re working with large text files that, even when compressed, are many times larger than the HDFS block size. To avoid having one map task process an entire large compressed file, you’ll need to pick a compression codec that can support splitting that file.

LZOP fits the bill, but working with it is more complex than the examples detailed in the previous technique because LZOP is not in and of itself splittable. “Wait,” you may be thinking, “didn’t you state earlier that LZOP is splittable?” LZOP is block-based, but you can’t perform a random seek into an LZOP file and determine the next block’s starting point. This is the challenge we’ll tackle in this technique.

Problem

You want to use a compression codec that will allow MapReduce to work in parallel on a single compressed file.

Solution

In MapReduce, splitting large LZOP-compressed input files requires the use of LZOP-specific input format classes, such as LzoInputFormat. The same principle applies when working with LZOP-compressed input files in both Pig and Hive.

Discussion

The LZOP compression codec is one of only two codecs that allow for compressed files to be split, and therefore to be worked on in parallel by multiple reducers. The other codec, bzip2, suffers from compression times that are so slow they arguably render the codec unusable. LZOP also offers a good compromise between compression and speed.

What’s the difference between LZO and LZOP?

Both LZO and LZOP codecs are supplied for use with Hadoop. LZO is a stream-based compression store that doesn’t have the notion of blocks or headers. LZOP has the notion of blocks (that are checksummed), and therefore is the codec you want to use, especially if you want your compressed output to be splittable. Confusingly, the Hadoop codecs by default treat files ending with the .lzo extension to be LZOP-encoded, and files ending with the .lzo_deflate extension to be LZO-encoded. Also, much of the documentation seems to use LZO and LZOP interchangeably.

Preparing your cluster for LZOP

Unfortunately, Hadoop doesn’t bundle LZOP for licensing reasons.[20]

20 LZOP used to be included with Hadoop, but with the work performed in JIRA ticket https://issues.apache.org/jira/browse/HADOOP-4874, it was removed from Hadoop version 0.20 and newer releases due to LZOP’s GPL licensing limiting its redistribution.

Getting all the prerequisites compiled and installed on your cluster is laborious, but rest assured that there are detailed instructions in the appendix. To compile and run the code in this section, you’ll need to follow the instructions in the appendix.

Reading and writing LZOP files in HDFS

We covered how to read and write compressed files in section 4.2. To perform the same activity with LZOP requires you to specify the LZOP codec in your code. This code is shown in the following listing.[21]

21 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/LzopFileReadWrite.java.

Listing 4.3. Methods to read and write LZOP files in HDFS

public static Path compress(Path src,

Configuration config)

throws IOException {

Path destFile =

new Path(

src.toString() +

new LzopCodec().getDefaultExtension());

LzopCodec codec = new LzopCodec();

codec.setConf(config);

FileSystem hdfs = FileSystem.get(config);

InputStream is = null;

OutputStream os = null;

try {

is = hdfs.open(src);

os = codec.createOutputStream(hdfs.create(destFile));

IOUtils.copyBytes(is, os, config);

} finally {

IOUtils.closeStream(os);

IOUtils.closeStream(is);

}

return destFile;

}

public static void decompress(Path src, Path dest,

Configuration config)

throws IOException {

LzopCodec codec = new LzopCodec();

codec.setConf(config);

FileSystem hdfs = FileSystem.get(config);

InputStream is = null;

OutputStream os = null;

try {

is = codec.createInputStream(hdfs.open(src));

os = hdfs.create(dest);

IOUtils.copyBytes(is, os, config);

} finally {

IOUtils.closeStream(os);

IOUtils.closeStream(is);

}

}

Let’s write and read an LZOP file, and then make sure that LZOP utilities can work with the generated file (replace $HADOOP_CONF_HOME with the location of your Hadoop config directory):

$ hadoop fs -put $HADOOP_CONF_DIR/core-site.xml core-site.xml

$ hip hip.ch4.LzopFileReadWrite core-site.xml

The preceding code will generate a core-site.xml.lzo file in HDFS.

Now make sure you can use this LZOP file with the lzop binary. Install an lzop binary on your host.[22] Copy the LZOP file from HDFS to local disk, uncompress it with the native lzop binary, and compare it with the original file:

22 For RedHat and Centos, you can install the rpm from http://pkgs.repoforge.org/lzop/lzop-1.03-1.el5.rf.x86_64.rpm.

$ hadoop fs -get core-site.xml.lzo /tmp/core-site.xml.lzo

$ lzop -l /tmp/core-site.xml.lzo

method compressed uncompr. ratio uncompressed_name

LZO1X-1 454 954 47.6% core-site.xml

# cd /tmp

$ lzop -d core-site.xml.lzo

$ ls -ltr

-rw-r--r-- 1 aholmes aholmes 954 May 5 09:05 core-site.xml

-rw-r--r-- 1 aholmes aholmes 504 May 5 09:05 core-site.xml.lzo

$ diff core-site.xml $HADOOP_CONF_DIR/conf/core-site.xml

$

The diff verified that the file compressed with the LZOP codec could be decompressed with the lzop binary.

Now that you have your LZOP file, you need to index it so that it can be split.

Creating indexes for your LZOP files

Earlier I made the paradoxical statement that LZOP files can be split, but that they’re not natively splittable. Let me clarify what that means—the lack of block-delimiting synchronization markers means you can’t do a random seek into an LZOP file and start reading blocks. But because internally it does use blocks, all you need is a preprocessing step that can generate an index file containing the block offsets.

The LZOP file is read in its entirety, and block offsets are written to the index file as the read is occurring. The index file format, shown in figure 4.6, is a binary file containing a consecutive series of 64-bit numbers that indicate the byte offset for each block in the LZOP file.

Figure 4.6. An LZOP index file is a binary containing a consecutive series of 64-bit numbers.

You can create index files in one of two ways, as shown in the following two code snippets. If you want to create an index file for a single LZOP file, here is a simple library call that will do this for you:

shell$ hadoop com.hadoop.compression.lzo.LzoIndexer core-site.xml.lzo

The following option works well if you have a large number of LZOP files and you want a more efficient way to generate the index files. The indexer runs a MapReduce job to create the index files. Both files and directories (which are scanned recursively for LZOP files) are supported:

shell$ hadoop \

com.hadoop.compression.lzo.DistributedLzoIndexer \

core-site.xml.lzo

Both approaches depicted in figure 4.6 will generate an index file in the same directory as the LZOP file. The index filename is the original LZOP filename suffixed with .index. Running the previous commands would yield the filename core-site.xml.lzo.index.

Now let’s look at how you can use the LzoIndexer in your Java code. The following code (from the main method of LzoIndexer) will result in the index file being created synchronously:

LzoIndexer lzoIndexer = new LzoIndexer(new Configuration());

for (String arg: args) {

try {

lzoIndexer.index(new Path(arg));

} catch (IOException e) {

LOG.error("Error indexing " + arg, e);

}

With the DistributedLzoIndexer, the MapReduce job will launch and run with N mappers, one for each .lzo file. No reducers are run, so the (identity) mapper, via the custom LzoSplitInputFormat and LzoIndexOutputFormat, writes the index files directly. If you want to run the MapReduce job from your own Java code, you can use the DistributedLzoIndexer code as an example.

You need the LZOP index files so that you can split LZOP files in your MapReduce, Pig, and Hive jobs. Now that you have the aforementioned LZOP index files, let’s look at how you can use them with MapReduce.

MapReduce and LZOP

After you’ve created index files for your LZOP files, it’s time to start using your LZOP files with MapReduce. Unfortunately, this brings us to the next challenge: none of the existing, built-in Hadoop-file-based input formats will work with splittable LZOP because they need specialized logic to handle input splits using the LZOP index file. You need specific input format classes to work with splittable LZOP.

The LZOP library provides an LzoTextInputFormat implementation for line-oriented LZOP-compressed text files with accompanying index files.[23]

23 The LZOP input formats also work well with LZOP files that don’t have index files.

The following code shows the steps required to configure the MapReduce job to work with LZOP. You would perform these steps for a MapReduce job that had text LZOP inputs and outputs:[24]

24 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/LzopMapReduce.java.

job.setInputFormatClass(LzoTextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.getConfiguration().setBoolean("mapred.output.compress", true);

job.getConfiguration().setClass("mapred.output.compression.codec",

LzopCodec.class, CompressionCodec.class);

Compressing intermediary map output will also speed up the overall execution time of your MapReduce jobs:

conf.setBoolean("mapred.compress.map.output", true);

conf.setClass("mapred.map.output.compression.codec",

LzopCodec.class,

CompressionCodec.class);

You can easily configure your cluster to always compress your map output by editing hdfs-site.xml:

<property>

<name>mapred.compress.map.output</name>

<value>true</value>

</property>

<property>

<name>mapred.map.output.compression.codec</name>

<value>com.hadoop.compression.lzo.LzopCodec</value>

</property>

The number of splits per LZOP file is a function of the number of LZOP blocks that the file occupies, not the number of HDFS blocks that the file occupies.

Now that we’ve covered MapReduce, let’s look at how Pig and Hive can work with splittable LZOP.

Pig and Hive

Elephant Bird,[25] a Twitter project containing utilities to work with LZOP, provides a number of useful MapReduce and Pig classes. Elephant Bird has an LzoPigStorage class that works with text-based, LZOP-compressed data in Pig.

25 See the appendix for more details on Elephant Bird.

Hive can work with LZOP-compressed text files by using the com.hadoop.mapred.DeprecatedLzoTextInputFormat input format class found in the LZO library.

Summary

Working with splittable compression in Hadoop is tricky. If you’re fortunate enough to be able to store your data in Avro or Parquet, they offer the simplest way to work with files that can be easily compressed and split. If you want to compress other file formats and need them to be split, LZOP is the only real candidate.

As I mentioned earlier, the Elephant Bird project provides some useful LZOP input formats that will work with LZOP-compressed file formats such as XML and plain text. If you need to work with an LZOP-compressed file format that isn’t supported by either Todd Lipcon’s LZO project or Elephant Bird, you’ll have to write your own input format. This is a big hurdle for developers. I hope at some point Hadoop will be able to support compressed files with custom splitting logic so that end users don’t have to write their own input formats for compression.

Compression is likely to be a hard-and-fast requirement for any production environment where resources are always scarce. Compression also allows faster execution times for your computational jobs, so it’s a compelling aspect of storage. In the previous section I showed you how to evaluate and pick the codec best suited for your data. We also covered how to use compression with HDFS, MapReduce, Pig, and Hive. Finally, we tackled the tricky subject of splittable LZOP compression.

4.3. Chapter summary

Big data in the form of large numbers of small files brings to light a limitation in HDFS, and in this chapter we worked around it by looking at how you can package small files into larger Avro containers.

Compression is a key part of any large cluster, and we evaluated and compared the different compression codecs. I recommended codecs based on various criteria and also showed you how to compress and work with these compressed files in Map-Reduce, Pig, and Hive. We also looked at how to work with LZOP to achieve compression as well as blazing-fast computation with multiple input splits.

This and the previous chapter were dedicated to looking at techniques for picking the right file format and working effectively with big data in MapReduce and HDFS. It’s now time to apply this knowledge and look at how to move data in and out of Hadoop. That’s covered in the next chapter.