Getting Started with Your First Topology - Real-time Analytics with Storm and Cassandra (2015)

Real-time Analytics with Storm and Cassandra (2015)

Chapter 2. Getting Started with Your First Topology

This chapter is dedicated to guiding you through the steps to set up the environment for the execution of a Storm topology. The intent is to prepare the user sandbox and get you steered toward executing some of the sample code and understanding the working of various components. All the concepts will be accompanied by code snippets and a "try it yourself" section so that you are equipped to understand the components in a practical manner and are ready to explore and harness the power of this wonderful technology.

The topics that will be covered in this chapter are as follows:

· Storm topology and components

· Executing the sample Storm topology

· Executing the topology in distributed mode

By the end of the chapter, you will be able to understand the components and data flow in a topology, understand the simple word count topology, and execute it in the local and distributed modes. You will also be able to tweak the starter project topologies to add your own flavor to them.

Prerequisites for setting up Storm

The prerequisites for executing the setup and execution steps are enlisted here:

· For a local mode setup, you need Maven, Git, Eclipse, and Java

· For a distributed setup, you need the following:

o A Linux or Ubuntu setup or a distributed setup can leverage PowerShell or Cygwin over their Windows systems

o Having more than one system or virtual machines using the VMware player would help

You can refer to the following links and follow the process laid out to set up the various open source components required to set up Storm and deploy the components explained in this segment of the book:

· For Java, https://java.com/en/download/index.jsp

· For Eclipse, https://www.eclipse.org/downloads/

· For Cygwin, http://cygwin.com/install.html

· For Git, https://help.github.com/articles/set-up-git

Components of a Storm topology

A Storm topology consists of two basic components: a spout and one or more bolts. These building blocks are tied together using streams; it is over these streams that endless arrays of tuples flow.

Let's discuss the topology with a simple analogy, as depicted in the diagram and explained thereafter:

Components of a Storm topology

In our example topology, we have a big processing unit for roasted chips where the input, raw potato, is consumed by the spout, and there are various bolts such as a peeler bolt, slicer bolt, and roasting bolt that perform the tasks as their name suggests. There are various assembly lines or workers that move the chips from the peeler unit to the shredder and beyond; in our case, we have streams to link and wire in the spout and bolts with each other. Now the basic unit of exchange between the peeler and shredder is a peeled potato, and between the shredder units and roasting units is a sliced potato. This is analogous to a tuple, the datum of information exchange between spouts and bolts.

Let's take a closer look at the building blocks of the Storm topology.

Note

The basic unit of data interchange within Storm is called a tuple; this is sometimes also referred to as an event.

Spouts

A spout is the collection funnel of a topology; it feeds events or tuples into the topology. It can be considered as the input source to the Storm processing unit—the topology.

The spout reads messages from external sources such as a queue, file, port, and so on. Also, the spout emits them into the stream, which in turn passes them to the bolts. It's the task of the Storm spout to track each event or tuple throughout its processing through the Directed Acyclic Graph (DAG). The Storm framework then sends and generates either acknowledgement or failure notifications based on the outcome of the execution of tuples in the topology. This mechanism gives the guaranteed processing feature to Storm. Based on the required functionality, spouts can be programmed or configured to be reliable or unreliable. A reliable spout plays back the failed events into the topology.

The following diagram depicts the same flow, graphically:

Spouts

All Storm spouts are implemented to be able to emit tuples on one or more stream bolts. As in the preceding diagram, a spout can emit tuples to both bolt A and C.

Each spout should implement the IRichSpout interface. The following are important methods to know in context with spout:

· nextTuple(): This is the method that keeps on polling the external source for new events; for instance, the queue in the preceding example. On every poll, if the method finds an event, it is emitted to the topology through a stream, and if there is no new event, the method simply returns.

· ack(): This method is called when the tuple emitted by the spout has been successfully processed by the topology.

· fail(): This method is called when a tuple emitted by the spout is not successfully processed within the specified timeout. In this case, for reliable spouts, the spout traces and tracks each tuple with the messageIds event, which are then re-emitted to the topology to be reprocessed. For instance, in the preceding figure, the failed tuple is emitted again.

For unreliable spouts, the tuples are not tracked using messageIds and the methods such as ack() and fail() don't hold any value as the spout doesn't track the tuples for successful processing. These topologies are identified as unreliable.

Note

IRichSpout is an interface provided by Storm that provides the details of the contracts or methods to be implemented by topology spouts.

Bolts

Bolts are the processing units of a topology. They are the components of the topology that perform one or more of the following tasks:

· Parsing

· Transformation

· Aggregation

· Joins

· Database interaction

The entire process being performed by the topology is generally divided into smaller tasks and subtasks, each preferably performed by a different bolt to exploit the power of the parallel distributed processing of Storm.

Let's look at the following figure that captures a real-time use case where the location coordinates from various airplanes are tracked and processed to ascertain whether they are moving on the correct trajectory:

Bolts

Here, the flight location coordinates are sent by sensors in the plane, which are collated at log servers and fed into a Storm topology. The Storm topology is broken into the following bolts that can act on the tuples emitted by the spout:

· The parse event bolt: This bolt filters and transforms the event emitted by the spout. It converts the information into a decipherable format.

· The location bolt: This is the bolt that extracts the location coordinates from the tuples it receives from the parse bolt and then sends them across to the next bolt.

· The verify bolt: This is the bolt that verifies the location coordinates sent by the location bolt against the predefined trajectory of the plane, and if it detects deviation, it sends a tuple to the alert bolt.

· The alert bolt: This bolt is the actor that informs the external systems, such as the air controller in our case, about the anomaly or deviation detected in the flight path.

Owing to the nature of real-time use cases, such as the one depicted in the preceding figure, speed and accuracy of computation is of utmost importance, and that's the reason that makes Storm a strong technological choice for the implementation of such solutions.

The total processing logic gets broken down into smaller tasks that are executed in bolts; configuring tasks and parallelism in bolts lets the engineers attain the right kind of performance for the solution.

One bolt can listen to multiple streams and it can emit to multiple other bolts on different streams. As depicted in the figure in the Sprouts section:

· Bolt-A emits to Bolt-B and Bolt-C

· Bolt-D subscribes to streams from Bolt-C and Bolt-B

The common interfaces provided by Storm to be implemented by user-defined bolts are as follows:

· IRichBolt

· IBasicBolt

The difference in these two interfaces depends upon whether reliable messaging and transactional support are required or not.

The main methods used by the bolts are as follows:

· prepare(): This is the method that is called when the bolt is initialized. Fundamentally, the Storm topology runs forever and the bolt once initialized will not terminate till the topology is killed. This method is generally used to initialize connections and read other static information, which is required during the entire life cycle of the bolt.

· execute(): This is the method that performs the functioning and processing logic defined on the bolt. It is executed for every tuple.

Streams

Stream can be defined as a sequence of tuples or events that are unbounded by nature. These streams are generally created in a parallel and distributed manner across the topology. Streams can be called the wiring or information flow channels between the spout and bolts. These are carriers of unprocessed, semiprocessed, and processed information to and from various task-performing components such as bolts and spouts. Streams are configured while encoding the topology using a schema that gives names to the fields in the stream's tuple.

Tuples – the data model in Storm

A tuple is the basic and constituent data structure in Storm. It's a named list of values that starts its journey from the spout. It's then emitted from streams to bolts, then from bolts to other bolts, where various stages of processing are executed. On successful completion of all intended processing, as per the topology definition, the tuples are acked back to the spout.

Executing a sample Storm topology – local mode

Before we start this section, the assumption is that you have gone through the prerequisites and installed the expected components.

WordCount topology from the Storm-starter project

To understand the components described in the previous section, let's download the Storm-starter project and execute a sample topology:

1. The Storm-starter project can be downloaded using the following Git command:

2. Linux-command-Prompt $ sudo git clone git://github.com/apache/incubator-storm.git && cd incubator-storm/examples/storm-starter

2. Next, you need to import the project into your Eclipse workspace:

1. Start Eclipse.

2. Click on the File menu and select the Import wizard.

3. From the Import wizard, select Existing Maven Projects.

WordCount topology from the Storm-starter project

4. Select pom.xml in the Storm-starter project and specify it as <download-folder>/starter/incubator-storm/examples/storm-starter.

5. Once the project has been successfully imported, the Eclipse folder structure will look like the following screenshot:

WordCount topology from the Storm-starter project

6. Execute the topology using the run command and you should be able to see the output as shown in the following screenshot:

WordCount topology from the Storm-starter project

To understand the functioning of the topology, let's take a look at the code and understand the flow and functioning of each component in the topology:

// instantiates the new builder object

TopologyBuilder builder = new TopologyBuilder();

// Adds a new spout of type "RandomSentenceSpout" with a parallelism hint of 5

builder.setSpout("spout", new RandomSentenceSpout(), 5);

Starting with the main function, in the WordCountTopology.java class, we find the TopologyBuilder object called builder; this is important to understand as this is the class that provides us with a template to define the topology. This class exposes the API to configure and wire in various spouts and bolts into a topology—a topology that is essentially a thrift structure at the end.

In the preceding code snippet, we created a TopologyBuilder object and used the template to perform the following:

· setSpout –RandomSentenceSpout: This generates random sentences. Please note that we are using a property called parallelism hint, which is set to 5 here. This is the property that identifies how many instances of this component will be spawned at the time of submitting the topology. In our example, we will have five instances of the spout.

· setBolt: We use this method to add two bolts to the topology: SplitSentenceBolt, which splits the sentence into words, and WordCountBolt, which counts the words.

· Other noteworthy items in the preceding code snippet are suffleGrouping and fieldsGrouping; we shall discuss these in detail in the next chapter; for now, understand that these are the components that control routing of tuples to various bolts in the topology.

Executing the topology in the distributed mode

To set up Storm in distributed mode, we will need to perform the following steps.

Set up Zookeeper (V 3.3.5) for Storm

The coordination of a Storm topology is maintained by a Zookeeper cluster. The utilization of Zookeeper is not very high, as it just maintains the runnable state of the Storm cluster. In most cases, a single Zookeeper node should suffice, but in production scenarios, at least a three-node Zookeeper cluster is recommended so that a single node doesn't become a single point of failure.

For reliable Zookeeper service, deploy Zookeeper in a cluster known as an ensemble. As long as the majority of the ensemble is up, the service will be available. One of the nodes in the ensemble is automatically selected as a leader and others as followers. If the leader goes down, one of the follower nodes becomes the leader.

Perform the following steps on all the machines that will be part of the Zookeeper ensemble to set up the Zookeeper cluster:

1. Download the most recent stable release (version 3.3.5) from the Apache Zookeeper site.

2. Create a zookeeper directory under /usr/local:

3. sudo mkdir /usr/local/zookeeper

3. Extract the downloaded TAR file to the /usr/local location. Use the following command:

4. sudo tar -xvf zookeeper-3.3.5.tar.gz -C /usr/local/zookeeper

4. Zookeeper needs a directory to store its data. Create /usr/local/zookeeper/tmp to store this data:

5. sudo mkdir –p /usr/local/zookeeper/tmp

5. Create a configuration file, zoo.cfg, under /usr/local/zookeeper/zookeeper-3.3.5/conf. The following properties will go in it:

o tickTime: This is the number of milliseconds of each tick (for example, 2000).

o initLimit: This is the number of ticks that the initial synchronization phase can take (for example, 5).

o syncLimit: This is the number of ticks that can pass between sending a request and getting an acknowledgement (for example, 2).

o dataDir: This is the directory where the snapshot is stored (for example, /usr/local/zookeeper/tmp).

o clientPort: This is the port at which the Zookeeper clients will connect to the port (for example, 2182).

o server.id=host:port:port: Every machine that is part of the Zookeeper ensemble should know about every other machine in the ensemble. This is accomplished with the series of lines of the server.id=host:port:port form (for example, server.1:<IP_ADDRESS_OF_ZOOKEEPER_NODE_1>:2888:3888).

6. Repeat the preceding steps or copy the distribution to other machines that will be part of the Zookeeper cluster.

7. Create a file with the name myid in the directory specified by the datadir property. The myid file consists of a single line containing only the text of that machine's ID (1 in the server and 1 in zoo.cfg). So, myid of server 1 will contain the text 1 and nothing else. The ID must be unique within the ensemble and should have a value between 1 and 255. The path of the myid file in this case is vi /usr/local/zookeeper/tmp/myid.

8. Edit the ~/.bashrc file and add an environment variable for the Zookeeper home and add its bin directory to the PATH environment variable:

Set up Zookeeper (V 3.3.5) for Storm

9. Source the ~/.bashrc file after making changes. This step is required to make sure that the changes that are made to bashrc are applied to the current terminal session:

10.source ~/.bashrc

10.Start the Zookeeper daemon on each node by executing the following command from $ZOOKEEPER_HOME:

11.sudo –E bin/zkServer.sh start

11.Stop the Zookeeper daemon on each node by executing the following command from $ZOOKEEPER_HOME:

12.sudo –E bin/zkServer.sh stop

12.The Zookeeper status can be checked by running the following command from $ZOOKEEPER_HOME:

13.sudo –E bin/zkServer.sh status

The output for the different modes is as follows:

· If running in the standalone mode (only a single machine is part of the Zookeeper ensemble cluster), the following output will be seen on the console:

Set up Zookeeper (V 3.3.5) for Storm

· If running in the clustered mode, the following output is seen on the leader node:

Set up Zookeeper (V 3.3.5) for Storm

· If running in the clustered mode, the following output is seen on the follower node:

Set up Zookeeper (V 3.3.5) for Storm

By default, the Zookeeper log (zookeeper.out) is created at the same location from where its instance is started. This completes the Zookeeper cluster setup.

Setting up Storm in the distributed mode

Perform the following steps to set up Storm in distributed mode:

1. Download the Storm-0.9.2-incubating.zip package from the GitHub Storm site.

2. Create the directories storm and storm/tmp under /usr/local:

3. sudo mkdir –p /usr/local/storm/tmp

3. Create the following directories for logs:

4. sudo mkdir –p /mnt/abc_logs/storm/storm_logs

4. Extract the ZIP file on Nimbus and the worker machines from the directory at /usr/local:

5. sudo unzip -d /usr/local/storm/ storm-0.9.2 -incubating.zip

5. Make the following changes at /usr/local/storm/storm-0.9.2-incubating/conf/storm.yaml:

o storm.zookeeper.servers: This is a list of the hosts in the Zookeeper cluster for the Storm cluster:

o storm.zookeeper.servers:

o "<IP_ADDRESS_OF_ZOOKEEPER_ENSEMBLE_NODE_1>"

"<IP_ADDRESS_OF_ZOOKEEPER_ENSEMBLE_NODE_2>"

o storm.zookeeper.port: This is the port on which the Zookeeper cluster is running:

storm.zookeeper.port: 2182

o storm.local.dir: The Nimbus and the supervisor require a location on the local disk to store a small amount of data related to configurations and execution details of the topology. Please make sure to create the directory and assign read/write permissions on all Storm nodes. For our installation, we are going to create this directory in the /usr/local/storm/tmp location:

storm.local.dir: "/usr/local/storm/tmp"

o nimbus.host: The nodes need to know which machine is the master in order to download topology jars and confs. This property is used for this purpose:

nimbus.host: "<IP_ADDRESS_OF_NIMBUS_HOST>"

o java.library.path: This is the load path for the native libraries that Storm uses (ZeroMQ and JZMQ). The default of /usr/local/lib:/opt/local/lib:/usr/lib should be fine for most installations, so validate the libraries in the previously mentioned locations before moving forward.

o storm.messaging.netty: Storm's Netty-based transport has been overhauled to significantly improve performance through better utilization of thread, CPU, and network resources, particularly in cases where message sizes are small. In order to provide Netty support, the following configurations need to be added:

o storm.messaging.transport:"backtype.storm.messaging.netty.Context"

o storm.messaging.netty.server_worker_threads:1

o storm.messaging.netty.client_worker_threads:1

o storm.messaging.netty.buffer_size:5242880

o storm.messaging.netty.max_retries:100

o storm.messaging.netty.max_wait_ms:1000

storm.messaging.netty.min_wait_ms:100

o The storm.yaml snippet from our Storm cluster installation is as follows:

o #To be filled in for a storm configuration

o storm.zookeeper.servers:

o - "nim-zkp-flm-3.abc.net"

o storm.zookeeper.port: 2182

o storm.local.dir: "/usr/local/storm/tmp"

o nimbus.host: "nim-zkp-flm-3.abc.net"

o topology.message.timeout.secs: 60

o topology.debug: false

o topology.optimize: true

o topology.ackers: 4

o

o storm.messaging.transport: "backtype.storm.messaging.netty.Context"

o storm.messaging.netty.server_worker_threads: 1

o storm.messaging.netty.client_worker_threads: 1

o storm.messaging.netty.buffer_size: 5242880

o storm.messaging.netty.max_retries: 100

o storm.messaging.netty.max_wait_ms: 1000

storm.messaging.netty.min_wait_ms: 100

6. Set the STORM_HOME environment in the ~/.bashrc file and add Storm's bin directory in the PATH environment variable. This is added to execute Storm binaries from any location.

7. Copy the Storm.yaml file to the bin folder of the Storm installation on the Nimbus machine using the following command:

8. sudo cp /usr/local/storm/storm-0.9.2- incubating/conf/storm.yaml /usr/local/storm/storm-0.8.2/bin/

Launching Storm daemons

Now that the Storm cluster is set, we will be required to start three processes on respective Storm nodes. They are as follows:

· Nimbus: Start Nimbus as the background process on the machine identified as the master node by running the following command from $STORM_HOME:

· sudo –bE bin/storm nimbus

· Supervisor: Supervisors can be started in a similar way Nimbus is started. Run the following command from $STORM_HOME:

· sudo –bE bin/storm supervisor

· UI: The Storm UI is a web application to check the Storm cluster, which contains the Nimbus/Supervisor status. It also lists all the running topologies and their details. The UI can be enabled by using the following command from $STORM_HOME:

· sudo –bE bin/storm ui

The UI can be accessed through http://<IP_ADDRESS_OF_NIMBUS>:8080.

Executing the topology from Command Prompt

Once the UI is visible and all the daemons are started, the topology can be submitted on Nimbus using the following command:

storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology WordCount -c nimbus.host=localhost

The Storm UI with the WordCount topology running in distributed mode is shown here. It depicts the topology state, uptime, and other details (we shall discuss the features of the UI in detail in a later chapter). We can kill the topology from the UI.

Executing the topology from Command Prompt

Tweaking the WordCount topology to customize it

Now that we have deployed the WordCount topology in distributed mode, let's tweak the code in the bolts a bit to write WordCount onto a file. To achieve this, we will proceed with the following steps:

1. We intend to create a new bolt, FileWriterBolt, to achieve this. Open WordCountTopology.java and add the following snippet to WordCountTopology.java:

2. public static class FileWriterBolt extends BaseBasicBolt {

3. Map<String, Integer> counts = new HashMap<String, Integer>();

4. @Override

5. public void execute(Tuple tuple, BasicOutputCollector collector) {

6. String word = tuple.getString(0);

7. Integer count = counts.get(word);

8. if(count==null) {count = 0;

9. count = 0;

10. }

11. count++;

12. counts.put(word, count);

13. OutputStream ostream;

14. try {

15. ostream = new FileOutputStream("~/wordCount.txt", true);

16. ostream.write(word.getBytes());

17. ostream.close();

18. } catch (IOException e) {

19. // TODO Auto-generated catch block

20. e.printStackTrace();

21. }

22. collector.emit(new Values(word, count));

23. }

24.

25. @Override

26. public void declareOutputFields(OutputFieldsDeclarer declarer) {

27. declarer.declare(new Fields("word", "count"));

}

2. Next we have to make changes to the main() method to use this new bolt instead of WordCount Bolt(); here is the snippet:

3. // instantiates the new builder object

4. TopologyBuilder builder = new TopologyBuilder();

5. // Adds a new spout of type "RandomSentenceSpout" with a parallelism hint of 5

6. builder.setSpout("spout", new RandomSentenceSpout(), 5);

7. //Adds a new bolt to the topology of type "SplitSentence" with parallelism of 8

8. builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");

9. //Adds a new bolt to the topology of type "SplitSentence" with parallelism of 8

//builder.setBolt("count", new FileWriterBolt()(), 12).fieldsGrouping("split", new Fields("word"));

3. Next, you can execute the topology using Eclipse, run it as Java, and the output will be saved into a file called wordCount.txt in your home directory.

4. To run in distributed mode, use the following steps:

1. Compile the topology changes to generate a new Storm-starter project using the following command line:

2. mvn clean install

2. Copy storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar from the target folder under the starter project to Nimbus, let's say, at /home/admin/topology/.

3. Submit the topology using the following command:

4. storm jar /home/admin/topology/storm-starter-0.0.1-SNAPSHOT- jar-with-dependencies.jar storm.starter.WordCountTopology WordCount -c nimbus.host=localhost

5. The output will be the same as the WordCount topology executed in the figure in the preceding section.

Quiz time

Q.1. State whether the following statements are true or false:

1. All Storm topologies are reliable.

2. A topology generally has multiple spouts.

3. A topology generally has multiple bolts.

4. One bolt can emit on only one stream.

Q.2. Fill in the blanks:

1. _______________ is the template to create the topology.

2. _______________ specifies how many instances of a particular bolt or spout are spawned.

3. The ___________ daemon of Storm is analogous to the job tracker of Hadoop.

Q.3. Perform the following task:

1. Make changes to the WordCount topology of the Storm-starter project to RandomSentenceSpout so that it's able to read sentences from a file at a specified location.

Summary

In this chapter, we have set up the Storm ensemble. You were introduced to the various building blocks of a Storm topology such as bolts, spouts, and the wiring template—topology builder. We executed and understood the WordCount topology and also made some amendments to it.

In the next chapter, you will read and understand about stream groupings, anchoring, and acking. That will also lead us to reliable and non-reliable mechanisms in the topologies under the Storm framework.