Distributed Cache and CEP with Storm - Real-time Analytics with Storm and Cassandra (2015)

Real-time Analytics with Storm and Cassandra (2015)

Chapter 11. Distributed Cache and CEP with Storm

In this chapter, we will learn about the need for distributed caching in conjunction with Storm and the integration of widely used options with Storm. We will also touch upon the Complex Event Processing (CEP) engines in collaboration with Storm.

In this chapter, we will cover the following topics:

· The need for distributed caches in the Storm framework

· Introduction to memcache

· Building a topology with caches

· Introduction to CEP and Esper

At the end of this chapter, you should be able to apply CEP and cache in conjunction with Storm to solve real-time use cases.

The need for distributed caching in Storm

Now that we have explored Storm enough to understand all its strengths, let's touch on one of its biggest weaknesses: the lack of a shared cache, a common store in memory that all tasks running across the workers on various nodes in the Storm cluster can access and write to.

The following figure illustrates a three node Storm cluster where we have two workers running on each of the supervisor nodes:

The need for distributed caching in Storm

As depicted in the preceding figure, each worker has its own JVM where the data can be stored and cached. However, what we are missing here is a layer of cache that shares components within the workers on a supervisor as well as across the supervisors. The following figure depicts the need for what we are referring to:

The need for distributed caching in Storm

The preceding figure depicts the need for a shared caching layer where common data can be placed, which is referable from all nodes. These are very valid use cases because in production, we encounter scenarios such as the following:

· We have a lot of read-only reference dimensional data, which we would want in one place instead of having it replicated and updated at each supervisor level

· Sometimes, we have transactional data in certain use cases, which are to be read and updated by all the workers; for example, when counting certain events, the count has to be kept in a common location

This is where the layer of common shared cache that is accessible across all supervisor nodes comes in.

Introduction to memcached

Memcached is a very simple in-memory key value store; we can assume it to be an in-memory store for a hash map. This can be used in conjunction with Storm supervisors to serve as a common memory storage, which can be accessed for read/write operations by all the Storm workers on various nodes in the Storm cluster.

Memcached has the following components:

· The memcached server

· The memcache client

· The hashing algorithm (client-based implementation)

· The server algorithm for data retention

Memcached uses Least Recently Used (LRU) to discard the elements from the cache. This means that the items that have not been referred since the longest time are the first ones to be removed from the cache. These items are said to be expired from the cache, and if they are referred after expiry, they are reloaded from a stable storage.

The following is the flow of how entries are loaded and retrieved from or through a cache:

Introduction to memcached

The preceding figure depicts the scenarios of cache hit and cache miss, where certain items expire as per the LRU algorithm. The scenarios in the preceding figure are as follows:

· When the cache app location starts, it's loaded with the data from the stable storage, in our case, from the database

· There are two scenarios that can happen in a situation where we request the data from the cache:

o Cache hit: This is where the data we request exists on the cache server and in this case, the request is served from the cache

o Cache miss: This is where the data requested doesn't exist in the cache server, and in this case, the data is fetched from the database into the cache and then the request is serviced from the cache

Now we understand how the cache functions and what the need for it in the context of solutions for Storm is.

Setting up memcache

The following are the steps that need to be executed and will be required for the installation of memcache:

wget http://memcached.org/latest

tar -zxvfmemcached-1.x.x.tar.gz

cdmemcached-1.x.x

./configure && make && make test &&sudo make install

The following is the code snippet to connect to the memcache client and functions. It retrieves the data from the cache:

public class MemCacheClient {

private static MemcachedClient client = null;

private static final Logger logger = LogUtils.getLogger(MemCacheClient.class);

/**

* Constructor that accepts the cache properties as parameter and initialises the client object accordingly.

* @param properties

* @throws Exception

*/

publicMemCacheClient(Properties properties) throws Exception {

super();

try {

if (null == client) {

client = new MemcachedClient(new InetSocketAddress(

102.23.34.22,

5454)));

}

} catch (IOException e) {

if (null != client)

shutdown();

throw new Exception("Error while initiating MemCacheClient", e);

}

}

/**

* Shutdown the client and nullify it

*/

public void shutdown() {

logger.info("Shutting down memcache client ");

client.shutdown();

client = null;

}

/**

* This method sets a value in cache with a specific key and timeout

* @param key the unique key to identify the value

* @paramtimeOut the time interval in ms after which the value would be refreshed

* @paramval

* @return

*/

public Future < Boolean > addToMemCache(String key, inttimeOut, Object val) {

if (null != client) {

Future < Boolean > future = client.set(key, timeOut, val);

return future;

} else {

return null;

}

}

/**

* retrives and returns the value object against the key passed in as parameter

* @param key

* @return

*/

public Object getMemcachedValue(String key) {

if (null != client) {

try {

returnclient.get(key);

} catch (OperationTimeoutException e) {

logger.error(

"Error while fetching value from memcache server for key " + key, e);

return null;

}

} else

return null;

}

}

Once you encode the preceding snippet, you will have built the mechanism to create the cache client, load data into the cache, and retrieve values from it. So any Storm bolt that needs access to the cache can use the common layer created by memcache through interactions with the client.

Building a topology with a cache

Once we have the basic cache framework in place, it's very easy to plug it into the bolts and reference data from the cache or update it in the cache. Here is the snippet for the cache:

public class MyCacheReaderBolt extends BaseBasicBolt {

MyCacheReadercacheReader;

@Override

public void prepare(Map stormConf, TopologyContext context) {

super.prepare(stormConf, context);

try {

cacheReader = new MyCacheReader();

} catch (Exception e) {

logger.error("Error while initializing Cache", e);

}

}

/**

* Called whenever a new tuple is received by this bolt. Responsible for

* emitting cache enriched event onto output stream

*/

public void execute(Tuple tuple, BasicOutputCollector collector) {

logger.info("execute method :: Start ");

event = tuple.getString(0);

populateEventFromCache(event);

collector.emit(outputStream, new Values(event));

} else {

logger.warn("Event not parsed :: " + tuple.getString(0));

}

} catch (Exception e) {

logger.error("Error in execute() ", e);

}

}

logger.info("execute method :: End ");

}

private void populateEventFromCache(Event event) {

HashMapfetchMap = (HashMap) cacheReader.get(searchObj.hashCode());

if (null != fetchMap) {

event.setAccountID(Integer.parseInt((String) fetchMap.get("account_id")));

logger.debug("Populating event" + event + " using cache " + fetchMap);

} else {

logger.debug("No matching event found in cache.");

}

logger.info("Time to fetch from cache=" + (System.currentTimeMillis() - t1) + "msec");

}

}

/**

* Declares output streams and tuple fields emitted from this bolt

*/

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

String stormStreamName = logStream.getName() + "_" + eventType;

declarer.declareStream(stormStreamName, new Fields(stormStreamName));

logger.debug("Topology : " + topology.getTopologyName() + ", Declared output stream : " + stormStreamName + ", Output field : " + stormStreamName);

}

The preceding code snippet demonstrates a bolt, which reads an event from the stream, gets some dimensional data from memcache, and emits the enriched bolt to the streams to the following bolts in the DAG topology.

Introduction to the complex event processing engine

There are two terms that are generally used in conjunction; they are Complex Event Processing (CEP) and Event Stream Processing (ESP).

Well, in theory, these are part of a technical paradigm that allow us to build applications with dramatic, real-time analytics over streaming data. They let us process incoming events at a very fast rate and execute SQL-like queries on top of the stream of events to generate real-time histograms. We can assume that CEP is an inversion of traditional databases. In the case of traditional DBMS and RDBMS, we have data stored, and then we run SQL queries over them to arrive at results, while in the case of CEPs, we have the queries predefined or stored and we run the data through them. We can envision this with an example; let's say I am running a department store and I would like to know the highest selling item in the last one hour. So if you look here, the query we are about to execute is pretty fixed in nature but the input data isn't constant—it changes at each sale transaction. Similarly, let's say I run a stock holding company and would like to know the top 10 performers over the last 2 minutes every 5 seconds.

Introduction to the complex event processing engine

The preceding figure depicts the stock ticker use case where we have a sliding window of 2 minutes and the stock ticker is sliding every 5 seconds. We have many practical use cases for this nowadays, such as:

· Fraud detection patterns against Point Of Sales (POS) transactions

· Top N in any segment

· The application of deep learning patterns to stream data from any source

Now, having understood CEP and its need at a high level, let's touch upon its high-level components:

· The operand in every CEP is Data of Event; it's essentially an event-driven system

· Event processing language: This is the tool that facilitates the framing of queries to be executed on the data

· Listeners: These are the components that actually execute the query and perform the action on the arrival of the event into the system

Esper

Esper is one of the leading CEP engines that are available under open source—GPL and Enterprise License. The package can be downloaded from http://www.espertech.com/download/, and if you are attempting to execute a Maven-based Esper project, the dependency can be built as follows:

<dependency>

<groupId>com.espertech</groupId>

<artifactId>esper</artifactId>

<version> ... </version>

</dependency>

Ref :Espertech.com

The next obvious question could be why we want to use Esper-CEP in conjunction with Storm. Well, Esper has some unique capabilities that work well with Storm and let the EQL facility leverage the results drawn over Storm. The following are complementing features that lead to this choice:

· Throughput: Complementing the capability of Storm, Esper also has a very high throughput and can process from 1K to 100K messages per second.

· Latency: Esper has the ability to perform EQLs and actions based on results of Esper at a very low latency rate; in most cases, this is the order of milliseconds.

· Computations: These refer to the ability to perform functions such as pattern detection, complex queries based on aggregates, and correlation over time. These slice windows of streaming data.

Getting started with Esper

Before we start conjugating Esper and Storm, let's try a small do-it-yourself exercise on Esper alone to understand the structural components of the Esper as well as its wiring.

Let's build a case where we are attempting to get the list of scores above 10,000 in Roulette.

We expect you to download the Esper bundle from EsperTech (http://www.espertech.com/community/) on to your POM before starting the coding. Or, you can use the Maven dependency mentioned in the preceding section.

The following is the code snippet of the Esper event—in our example, this is CasinoWinEvent, a value object where we store the name of the game, the prize amount, and the timestamp:

public static class CasinoWinEvent {

String game;

Double prizeAmount;

Date timeStamp;

publicCasinoWinEvent(String s, double p, long t) {

game = s;

prizeAmount = p;

timeStamp = new Date(t);

}

public double getPrizeAmount() {

return prizeAmount;

}

public String getGame() {

return game;

}

public Date getTimeStamp() {

return timeStamp;

}

@

Override

public String toString() {

return "Price: " + price.toString() + " time: " + timeStamp.toString();

}

}

Once we have the value object in place, the next step is to instantiate the Esper engine and listener and wire in all the pieces together:

public class myEsperMain {

private static Random generator = new Random();

public static void GenerateRandomCasinoWinEvent(EPRuntimecepRT) {

doubleprizeAmount = (double) generator.nextInt(10);

longtimeStamp = System.currentTimeMillis();

String game = "Roulette";

CasinoWinEventcasinoEvent = new CasinoWinEvent(game, prizeAmount, timeStamp);

System.out.println("Sending Event:" + casinoEvent);

cepRT.sendEvent(casinoEvent);

}

public static class CEPListener implements UpdateListener {

public void update(EventBean[] newData, EventBean[] oldData) {

System.out.println("Event received: " + newData[0].getUnderlying());

}

}

public static void main(String[] args) {

//The Configuration is meant only as an initialization-time object.

Configuration cepConfig = new Configuration();

cepConfig.addEventType("CasinoEvent", CasinoWinEvent.class.getName());

EPServiceProvidercep = EPServiceProviderManager.getProvider("myCEPEngine", cepConfig);

EPRuntimecepRT = cep.getEPRuntime();

EPAdministratorcepAdm = cep.getEPAdministrator();

EPStatementcepStatement = cepAdm.createEPL("select * from " + "CasinoEvent(symbol='Roulette').win:length(2) " + "having avg(prizeAmount) > 10000.0");

cepStatement.addListener(new CEPListener());

// We generate a few ticks...

for (inti = 0; i < 5; i++) {

GenerateRandomCasinoWinEvent(cepRT);

}

}

}

Here is the snippet of the output:

Getting started with Esper

In the preceding snippet, CEPListener is the implementation of updateListener (which listens for the arrival of the event), newData has the stream of one or more new arriving events, and oldData has the previous state of the stream, that is, before the arrival of the current trigger to the listener.

In the main method, we can load the Esper configuration or, as shown in our preceding case, create a default configuration. Then, we create an Esper runtime engine instance and bind the EQL query to it.

If you look at the cepStatement.addListener(new CEPListener()) statement in the preceding code, you will see that we are also binding the listener to the statement, thus wiring all the pieces together.

Integrating Esper with Storm

The following figure depicts how we plan to use Esper in conjunction with one of the topologies we created earlier in Chapter 6, Adding NoSQL Persistence to Storm. The integration of Storm with Esper gives the developer the power to execute SQL-like queries on top of the stream of events being processed by Storm.

Integrating Esper with Storm

Here, we made some modifications to one of the earlier topologies that we created, and we added an Esper bolt to the same topology. This bolt reads the same stream that is being dumped into Cassandra and performs an EQL execution through Esperlistener. It executes to filter the set of records where the call duration is 0 seconds.

The following is a snippet from the ZeroDuration filter bolt that filters the CALL_END events that have a duration of 0 seconds to be emitted onto the stream feeding the Esper bolt:

/*

* Bolt responsible for forwarding events which satisfy following criteria:

* <ul>

* <li>event should belong to 'End' type</li>

* <li>duration should be zero</li>

* </ul>

*/

public class ZeroSecondsCDRBolt extends BaseRichBolt {

/**

* Called when {@link ZeroSecondsCDRBolt} is initialized

*/

@Override

public void prepare(Map conf, TopologyContext context,

OutputCollector collector) {

logger.info("prepare method :: Start ");

this.collector = collector;

logger.info("prepare() conf {},Collector {}", conf.toString(), collector.toString());

logger.info("prepare method :: End ");

}

/**

* Called whenever a new tuple is received by this bolt. This method

* filters zero duration End records

*/

@

Override

public void execute(Tuple tuple) {

logger.info("execute method :: Start ");

if (tuple != null && tuple.getString(0) != null) {

eventCounter++;

String event = tuple.getString(0);

logger.info("execute :event recd :: {}", event);

if (event != null && event.contains("CALL_END")) {

emitCallEndRecords(tuple);

}

collector.ack(tuple);

}

logger.info("execute method :: End ");

}

private void emitCallEndRecords(Tuple tuple) {

String event = tuple.getString(0);

try {

//splitting the event based on semicolon

String[] eventTokens = event.split(",");

duration = Long.parseLong(eventTokens[4]);

callId = Long.parseLong(eventTokens[0]);

logger.debug(" Event (callId = {}) is a Zero duration Qualifier ", callId);

collector.emit(....);

} catch (Exception e) {

logger.error("Corrupt Stopped record. Error occurred while parsing the event : {}", event);

}

}

/**

* Declares output fields in tuple emitted from this bolt

*/

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declareStream(CALL_END, new Fields());

}

@

Override

public Map < String, Object > getComponentConfiguration() {

return null;

}

}

The next step is to conjugate the Esper bolt into the topology. This can be easily downloaded as a bundle from https://github.com/tomdz/storm-esper, and it can be quickly bundled into the topology using the following code:

EsperBoltesperBolt = newEsperBolt.Builder()

.inputs()

.aliasComponent("ZeroSecondCallBolt")

.withFields("a", "b")

.ofType(Integer.class)

.toEventType("CALL_END")

.outputs()

.outputs().onDefaultStream().emit("count")

.statements()

.add("select callID as CALL_ID,callType as CALL_TYPE, count(*) as OCCURRENCE_CNT from CDR.win:time_batch(5 minutes) where (eventType = 'CALL_END') and (duration = 0) group by callID,eventType having count(*) > 0 order by OCCURRENCE_CNTdesc")

.build();

Here is what the output would be like:

Integrating Esper with Storm

The Esper query in the preceding figure executes on a stream of incoming data; here is its breakdown and explanation:

selectcallID as CALL_ID,callType as CALL_TYPE, count(*) as OCCURRENCE_CNT

We are selecting the following fields from the incoming tuples, such as Call_Id, Call_type, and count:

fromCDR.win:time_batch(5 minutes) where (eventType = 'CALL_END') and (duration = 0) group by callID,eventTypehaving count(*) > 0

order by OCCURRENCE_CNTdesc

The named window out of which we are operating is CDR.WIN. The batch size is 5 minutes, which means that with the arrival of every event or tuple, we are looking back into time for 5 minutes and executing the query over data that has arrived in the last 5 minutes. The results are grouped by the event type and are sorted in reverse order.

Quiz time

Q.1. State whether the following statements are true or false:

1. Cache is a read-only memory space.

2. Data once added in cache remains there forever.

3. CEP lets SQL-like queries be implemented over streaming data.

4. Esper is based on event-driven architecture.

Q.2. Fill in the blanks:

1. _______________ is the algorithm for memcache.

2. When data is not present in the cache, it's called ______________.

3. _______________ is the component of Esper that triggers the Endeca Query Language (EQL) execution.

4. _______________ is generally used for the time series windowing function data.

Q.3. Create an end-to-end topology using Esper to display the top 10 speeding devices on a said freeway using Storm and Esper in conjunction.

Summary

In this chapter, we covered the concept of cache in conjunction with Storm and the utility and application of the solutions developer with cache. We learned about memcache as a caching system.

In the later part of the chapter, we explored Esper as a complex event processing system and understood its integration with Storm topologies.