A Quick Start Guide to Flume - Apache Flume: Distributed Log Collection for Hadoop, Second Edition (2015)

Apache Flume: Distributed Log Collection for Hadoop, Second Edition (2015)

Chapter 2. A Quick Start Guide to Flume

As we covered some of the basics in the previous chapter, this chapter will help you get started with Flume. So, let's start with the first step: downloading and configuring Flume.

Downloading Flume

Let's download Flume from http://flume.apache.org/. Look for the download link in the side navigation. You'll see two compressed .tar archives available along with the checksum and GPG signature files used to verify the archives. Instructions to verify the download are on the website, so I won't cover them here. Checking the checksum file contents against the actual checksum verifies that the download was not corrupted. Checking the signature file validates that all the files you are downloading (including the checksum and signature) came from Apache and not some nefarious location. Do you really need to verify your downloads? In general, it is a good idea and it is recommended by Apache that you do so. If you choose not to, I won't tell.

The binary distribution archive has bin in the name, and the source archive is marked with src. The source archive contains just the Flume source code. The binary distribution is much larger because it contains not only the Flume source and the compiled Flume components (jars, javadocs, and so on), but also all the dependent Java libraries. The binary package contains the same Maven POM file as the source archive, so you can always recompile the code even if you start with the binary distribution.

Go ahead, download and verify the binary distribution to save us some time in getting started.

Flume in Hadoop distributions

Flume is available with some Hadoop distributions. The distributions supposedly provide bundles of Hadoop's core components and satellite projects (such as Flume) in a way that ensures things such as version compatibility and additional bug fixes are taken into account. These distributions aren't better or worse; they're just different.

There are benefits to using a distribution. Someone else has already done the work of pulling together all the version-compatible components. Today, this is less of an issue since the Apache BigTop project started (http://bigtop.apache.org/). Nevertheless, having prebuilt standard OS packages, such as RPMs and DEBs, ease installation as well as provide startup/shutdown scripts. Each distribution has different levels of free and paid options, including paid professional services if you really get into a situation you just can't handle.

There are downsides, of course. The version of Flume bundled in a distribution will often lag quite a bit behind the Apache releases. If there is a new or bleeding-edge feature you are interested in using, you'll either be waiting for your distribution's provider to backport it for you, or you'll be stuck patching it yourself. Furthermore, while the distribution providers do a fair amount of testing, such as any general-purpose platform, you will most likely encounter something that their testing didn't cover, in which case, you are still on the hook to come up with a workaround or dive into the code, fix it, and hopefully, submit that patch back to the open source community (where, at a future point, it'll make it into an update of your distribution or the next version).

So, things move slower in a Hadoop distribution world. You can see that as good or bad. Usually, large companies don't like the instability of bleeding-edge technology or making changes often, as change can be the most common cause of unplanned outages. You'd be hard pressed to find such a company using the bleeding-edge Linux kernel rather than something like Red Hat Enterprise Linux (RHEL), CentOS, Ubuntu LTS, or any of the other distributions whose target is stability and compatibility. If you are a startup building the next Internet fad, you might need that bleeding-edge feature to get a leg up on the established competition.

If you are considering a distribution, do the research and see what you are getting (or not getting) with each. Remember that each of these offerings is hoping that you'll eventually want and/or need their Enterprise offering, which usually doesn't come cheap. Do your homework.

Note

Here's a short, nondefinitive list of some of the more established players. For more information, refer to the following links:

· Cloudera: http://cloudera.com/

· Hortonworks: http://hortonworks.com/

· MapR: http://mapr.com/

An overview of the Flume configuration file

Now that we've downloaded Flume, let's spend some time going over how to configure an agent.

A Flume agent's default configuration provider uses a simple Java property file of key/value pairs that you pass as an argument to the agent upon startup. As you can configure more than one agent in a single file, you will need to additionally pass an agent identifier (called a name) so that it knows which configurations to use. In my examples where I'm only specifying one agent, I'm going to use the name agent.

Note

By default, the configuration property file is monitored for changes every 30 seconds. If a change is detected, Flume will attempt to reconfigure itself. In practice, many of the configuration settings cannot be changed after the agent has started. Save yourself some trouble and pass the undocumented --no-reload-conf argument when starting the agent (except in development situations perhaps).

If you use the Cloudera distribution, the passing of this flag is currently not possible. I've opened a ticket to fix that at https://issues.cloudera.org/browse/DISTRO-648. If this is important to you, please vote it up.

Each agent is configured, starting with three parameters:

agent.sources=<list of sources>

agent.channels=<list of channels>

agent.sinks=<list of sinks>

Tip

Downloading the example code

You can download the example code files from your account at http://www.packtpub.com for all the Packt Publishing books you have purchased. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

Each source, channel, and sink also has a unique name within the context of that agent. For example, if I'm going to transport my Apache access logs, I might define a channel named access. The configurations for this channel would all start with theagent.channels.access prefix. Each configuration item has a type property that tells Flume what kind of source, channel, or sink it is. In this case, we are going to use an in-memory channel whose type is memory. The complete configuration for the channel namedaccess in the agent named agent would be:

agent.channels.access.type=memory

Any arguments to a source, channel, or sink are added as additional properties using the same prefix. The memory channel has a capacity parameter to indicate the maximum number of Flume events it can hold. Let's say we didn't want to use the default value of 100; our configuration would now look like this:

agent.channels.access.type=memory

agent.channels.access.capacity=200

Finally, we need to add the access channel name to the agent.channels property so that the agent knows to load it:

agent.channels=access

Let's look at a complete example using the canonical Hello, World! example.

Starting up with "Hello, World!"

No technical book would be complete without a Hello, World! example. Here is the configuration file we'll be using:

agent.sources=s1

agent.channels=c1

agent.sinks=k1

agent.sources.s1.type=netcat

agent.sources.s1.channels=c1

agent.sources.s1.bind=0.0.0.0

agent.sources.s1.port=12345

agent.channels.c1.type=memory

agent.sinks.k1.type=logger

agent.sinks.k1.channel=c1

Here, I've defined one agent (called agent) who has a source named s1, a channel named c1, and a sink named k1.

The s1 source's type is netcat, which simply opens a socket listening for events (one line of text per event). It requires two parameters: a bind IP and a port number. In this example, we are using 0.0.0.0 for a bind address (the Java convention to specify listen on any address) and port 12345. The source configuration also has a parameter called channels (plural), which is the name of the channel(s) the source will append events to, in this case, c1. It is plural, because you can configure a source to write to more than one channel; we just aren't doing that in this simple example.

The channel named c1 is a memory channel with a default configuration.

The sink named k1 is of the logger type. This is a sink that is mostly used for debugging and testing. It will log all events at the INFO level using Log4j, which it receives from the configured channel, in this case, c1. Here, the channel keyword is singular because a sink can only be fed data from one channel.

Using this configuration, let's run the agent and connect to it using the Linux netcat utility to send an event.

First, explode the .tar archive of the binary distribution we downloaded earlier:

$ tar -zxf apache-flume-1.5.2-bin.tar.gz

$ cd apache-flume-1.5.2-bin

Next, let's briefly look at the help. Run the flume-ng command with the help command:

$ ./bin/flume-ng help

Usage: ./bin/flume-ng <command> [options]...

commands:

help display this help text

agent run a Flume agent

avro-client run an avro Flume client

version show Flume version info

global options:

--conf,-c <conf> use configs in <conf> directory

--classpath,-C <cp> append to the classpath

--dryrun,-d do not actually start Flume, just print the command

--plugins-path <dirs> colon-separated list of plugins.d directories. See the

plugins.d section in the user guide for more details.

Default: $FLUME_HOME/plugins.d

-Dproperty=value sets a Java system property value

-Xproperty=value sets a Java -X option

agent options:

--conf-file,-f <file> specify a config file (required)

--name,-n <name> the name of this agent (required)

--help,-h display help text

avro-client options:

--rpcProps,-P <file> RPC client properties file with server connection params

--host,-H <host> hostname to which events will be sent

--port,-p <port> port of the avro source

--dirname <dir> directory to stream to avro source

--filename,-F <file> text file to stream to avro source (default: std input)

--headerFile,-R <file> File containing event headers as key/value pairs on each new line

--help,-h display help text

Either --rpcProps or both --host and --port must be specified.

Note that if <conf> directory is specified, then it is always included first in the classpath.

As you can see, there are two ways with which you can invoke the command (other than the simple help and version commands). We will be using the agent command. The use of avro-client will be covered later.

The agent command has two required parameters: a configuration file to use and the agent name (in case your configuration contains multiple agents).

Let's take our sample configuration and open an editor (vi in my case, but use whatever you like):

$ vi conf/hw.conf

Next, place the contents of the preceding configuration into the editor, save, and exit back to the shell.

Now you can start the agent:

$ ./bin/flume-ng agent -n agent -c conf -f conf/hw.conf -Dflume.root.logger=INFO,console

The -Dflume.root.logger property overrides the root logger in conf/log4j.properties to use the console appender.

If we didn't override the root logger, everything would still work, but the output would go to the log/flume.log file instead of being based on the contents of the default configuration file. Of course, you can edit the conf/log4j.properties file and change theflume.root.logger property (or anything else you like). To change just the path or filename, you can set the flume.log.dir and flume.log.file properties in the configuration file or pass additional flags on the command line as follows:

$ ./bin/flume-ng agent -n agent -c conf -f conf/hw.conf -Dflume.root.logger=INFO,console -Dflume.log.dir=/tmp -Dflume.log.file=flume-agent.log

You might ask why you need to specify the -c parameter, as the -f parameter contains the complete relative path to the configuration. The reason for this is that the Log4j configuration file should be included on the class path.

If you left the -c parameter off the command, you'll see this error:

Warning: No configuration directory set! Use --conf <dir> to override.

log4j:WARN No appenders could be found for logger (org.apache.flume.lifecycle.LifecycleSupervisor).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

But you didn't do that so you should see these key log lines:

2014-10-05 15:39:06,109 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [agent]

This line tells you that your agent starts with the name agent.

Usually you'd look for this line only to be sure you started the right configuration when you have multiple configurations defined in your configuration file.

2014-10-05 15:39:06,076 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:conf/hw.conf

This is another sanity check to make sure you are loading the correct file, in this case our hw.conf file.

2014-10-05 15:39:06,221 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:s1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@442fbe47 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }

Once all the configurations have been parsed, you will see this message, which shows you everything that was configured. You can see s1, c1, and k1, and which Java classes are actually doing the work. As you probably guessed, netcat is a convenience fororg.apache.flume.source.NetcatSource. We could have used the class name if we wanted. In fact, if I had my own custom source written, I would use its class name for the source's type parameter. You cannot define your own short names without patching the Flume distribution.

2014-10-05 15:39:06,427 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/0.0.0.0:12345]

Here, we see that our source is now listening on port 12345 for the input. So, let's send some data to it.

Finally, open a second terminal. We'll use the nc command (you can use Telnet or anything else similar) to send the Hello World string and press the Return (Enter) key to mark the end of the event:

% nc localhost 12345

Hello World

OK

The OK message came from the agent after we pressed the Return key, signifying that it accepted the line of text as a single Flume event. If you look at the agent log, you will see the following:

2014-10-05 15:44:11,215 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 Hello World }

This log message shows you that the Flume event contains no headers (NetcatSource doesn't add any itself). The body is shown in hexadecimal along with a string representation (for us humans to read, in this case, our Hello World message).

If I send the following line and then press the Enter key, you'll get an OK message:

The quick brown fox jumped over the lazy dog.

You'll see this in the agent's log:

2014-10-05 15:44:57,232 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 54 68 65 20 71 75 69 63 6B 20 62 72 6F 77 6E 20 The quick brown }

The event appears to have been truncated. The logger sink, by design, limits the body content to 16 bytes to keep your screen from being filled with more than what you'd need in a debugging context. If you need to see the full contents for debugging, you should use a different sink, perhaps the file_roll sink, which would write to the local filesystem.

Summary

In this chapter, we covered how to download the Flume binary distribution. We created a simple configuration file that included one source writing to one channel, feeding one sink. The source listened on a socket for network clients to connect to and to send it event data. These events were written to an in-memory channel and then fed to a Log4j sink to become the output. We then connected to our listening agent using the Linux netcat utility and sent some string events to our Flume agent's source. Finally, we verified that our Log4j-based sink wrote the events out.

In the next chapter, we'll take a detailed look at the two major channel types you'll most likely use in your data processing workflows: the memory channel and the file channel.

We will also take a look at a new experimental channel, introduced in Version 1.5 of Flume, called the Spillable Memory Channel, which attempts to be a hybrid of the other two.

For each type, we'll discuss all the configuration knobs available to you, when and why you might want to deviate from the defaults, and most importantly, why to use one over the other.