Plugin Development - ElasticSearch Cookbook, Second Edition (2015)

ElasticSearch Cookbook, Second Edition (2015)

Chapter 12. Plugin Development

In this chapter, we will cover the following recipes:

· Creating a site plugin

· Creating a native plugin

· Creating a REST plugin

· Creating a cluster action

· Creating an analyzer plugin

· Creating a river plugin

Introduction

ElasticSearch can be extended with plugins to improve its capabilities. In the previous chapters, we have installed and used many of these plugins, such as transport, river, and scripting.

Plugins are application extensions that can add many features to ElasticSearch. They have several usages, as follows:

· Adding a new transport layer (the thrift and memcached plugins are examples of this type)

· Adding a new scripting language (such as Python and JavaScript plugins)

· Extending Lucene-supported analyzers and tokenizers

· Using native scripting to speed up the computation of scores, filters, and field manipulation

· Extending node capabilities, for example, creating a node plugin that can execute your logic

· Adding a new river to support new sources

· Monitoring and administering the cluster

ElasticSearch plugins are of two kinds such as site and native plugins.

A site plugin is generally a standard HTML5 web application, while a native plugin has some Java content that defines a plugin's endpoints and implements plugin functionalities.

In this chapter, we will use the Java language to develop the native plugin, but it is possible to use any JVM language that generates JAR files.

Creating a site plugin

Site plugins do not add internal functionalities to ElasticSearch. They are HTML-based web applications that work on top of ElasticSearch. They generally provide frontend functionalities, such as monitoring and administration. In Chapter 9, Cluster and Node Monitoring, we saw the use of several site plugins such as ElasticSearch Head and BigDesk.

Getting ready

You will need a working ElasticSearch node, a web browser, and your preferred HTML editor.

How to do it...

In order to create a site plugin, perform the following steps:

1. The site plugin is one of the most easy plugins to develop. It is mainly a standard web application composed of only HTML, JavaScript, and images.The simplest plugin is composed of a single index.html page, as shown here:

2. <!DOCTYPE html>

3. <html>

4. <head>

5. <title>Simple site plugin</title>

6. <meta name="viewport" content="width=device-width, initial-scale=1.0">

7. <link href="http://netdna.bootstrapcdn.com/twitter- bootstrap/2.3.0/css/bootstrap-combined.min.css" rel="stylesheet">

8. </head>

9. <body>

10. <h1>Hello, from the site plugin!</h1>

11. <script src="http://code.jquery.com/jquery.js"></script>

12. <script src="http://netdna.bootstrapcdn.com/twitter-bootstrap/2.3.0/js/bootstrap.min.js"></script>

13. </body>

</html>

14. The HTML file and the resources must be put in the _site directory under the plugin directory.

How it works...

When ElasticSearch starts, it analyzes the plugin's directory. If a _site directory is present in the plugin directory, it loads the plugin as a site plugin; otherwise, the plugin is considered as a native plugin.

Site plugins have static contents. When the browser is pointed to the server address of the plugin (that is, http://localhost:9200/_plugins/<plugin_name>/), ElasticSearch serves as the resource for a traditional web application. It generally searches for an index.htmlfile and serves it and its related resources.

Note

While writing a plugin and loading resources (that is, images, JavaScript, or CSS), make sure that every resource is specified relative to the index.html file or has an absolute URL, in order to prevent errors.

Site plugins work very well to package a small web application that executes some focused tasks, as follows:

· Displaying information regarding status and data aggregation, and a quick view of some important aspects of your ElasticSearch cluster or indices.

· Administration and sending commands via a web interface is easier than via curl commands or the programming API. A user can aggregate his administrative pipeline (index creation, data manipulation, and custom commands) and use it to manage its custom data.

Tip

To easily develop your plugin, I suggest you develop it outside ElasticSearch and to pack it in a ZIP file for distribution.

Site plugins allow the use of every HTML5 web application framework available for the client's site development. It's quite normal that the currently available site plugins will use different JavaScript frameworks, such as JQuery (including, Bootstrap), AngularJS, and Ember.js.

There's more…

Many of the interfaces used to manage an ElasticSearch cluster are generally developed as site plugins. These are the most popular ones:

· The BigDesk plugin

· The ElasticSearch-head plugin

· Elastic HQ

We have already seen many of plugins in Chapter 9, Cluster and Node Monitoring.

See also

· You can get more information on ElasticSearch plugins at http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-plugins.html#_plugins

Creating a native plugin

In the previous recipe, we saw the site plugin. ElasticSearch also allows you to create a more powerful type of plugin, the: native JAR plugins.

Native plugins allow you to extend several aspects of the ElasticSearch server, but this requires a good knowledge of Java. Because these plugins are compiled through the JVM bytecode, they are generally very fast. In this recipe, we will see how to set up a working environment in order to develop native plugins.

Getting ready

You will need a working ElasticSearch node, a Maven build tool, and optionally a Java IDE. The code of this recipe is available in the chapter12/simple_plugin directory, kept in the code bundle of the chapter on the Packt Publishing website.

How to do it...

Generally, ElasticSearch plugins are developed in Java using the Maven build tool and deployed as a ZIP file. In order to create a simple JAR plugin, perform the following steps:

1. To correctly build and serve a plugin, the following files must be defined:

· pom.xml: This file is used to define the build configuration for Maven.

· es-plugin: This states the properties that define the namespace of the plugin class that must be loaded.

· <name>plugin: In Java, this is the main plugin class; it is loaded at start up and initializes the plugin action.

· plugin.xml: These assemblies define how to execute the assembly steps of Maven. It is used to build the ZIP file to deliver the plugin.

2. A standard pom.xml file used to create a plugin contains the following code:

· This is how a Maven pom.xml header will look:

· <?xml version="1.0" encoding="UTF-8"?>

· <project xmlns="http://maven.apache.org/POM/4.0.0"

· xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

· <name>elasticsearch-simple-plugin</name>

· <modelVersion>4.0.0</modelVersion>

· <groupId>com.packtpub</groupId>

· <artifactId>simple-plugin</artifactId>

· <version>0.0.1-SNAPSHOT</version>

· <packaging>jar</packaging>

· <description>A simple plugin for ElasticSearch</description>

· <inceptionYear>2013</inceptionYear>

<licenses>… </licenses>

· This is the parent pom.xml file used to derive common properties or settings:

· <parent>

· <groupId>org.sonatype.oss</groupId>

· <artifactId>oss-parent</artifactId>

· <version>7</version>

</parent>

· Some properties mainly used to simplify the dependencies are given as follows:

· <properties>

· <elasticsearch.version>1.4.0</elasticsearch.version>

</properties>

· A list of JAR dependencies:

· <dependencies>

· <dependency>

· <groupId>org.elasticsearch</groupId>

· <artifactId>elasticsearch</artifactId>

· <version>${elasticsearch.version}</version>

· <scope>compile</scope>

· </dependency>

· <dependency>

· <groupId>log4j</groupId>

· <artifactId>log4j</artifactId>

· <version>1.2.17</version>

· <scope>runtime</scope>

· </dependency>

· <!- test dependencies -->

</dependencies>

· A list of Maven plugins required to build and deploy the artifact; the following is the code for enabling the Maven plugin:

· <build>

· <plugins>

· <plugin>

· <!- for compiling -->

· <groupId>org.apache.maven.plugins</groupId>

· <artifactId>maven-compiler-

· plugin</artifactId>

· <version>3.1</version>

· <configuration>

· <source>1.7</source>

· <target>1.7</target>

· </configuration>

· </plugin>

· <plugin>

· <!- optional for executing tests -->

· <groupId>org.apache.maven.plugins</groupId>

· <artifactId>maven-surefire- plugin</artifactId>

· <version>2.12.3</version>

· <configuration>

· <includes>

· <include>**/*Tests.java</include>

· </includes>

· </configuration>

· </plugin>

· <plugin>

· <!- optional for publishing the source

· -->

· <groupId>org.apache.maven.plugins</groupId>

· <artifactId>maven-source- plugin</artifactId>

· <version>2.3</version>

· <executions>

· <execution>

· <id>attach-sources</id>

· <goals>

· <goal>jar</goal>

· </goals>

· </execution>

· </executions>

· </plugin>

· <plugin>>

· <!- for packaging the plugin -->

· <artifactId>maven-assembly-

· plugin</artifactId>

· <version>2.3</version>

· <configuration>

· <appendAssemblyId>false</appendAssemblyId>

· <outputDirectory>${project.build.directory}/releases/

· </outputDirectory>

· <descriptors>

· <descriptor>${basedir}/src/main/assemblies/plugin.xml

· </descriptor>

· </descriptors>

· </configuration>

· <executions>

· <execution>

· <phase>package</phase>

· <goals>

· <goal>single</goal>

· </goals>

· </execution>

· </executions>

· </plugin>

· </plugins>

</build>undefined</project>

· In JAR, there must be a src/main/resources/es-plugin.properties file that defines the entry point class that must be loaded during plugin initialization. This file must be embedded in the final jar, as it is usually put in the src/main/resources directory of the Maven project. It generally contains a single line of code:

plugin=org.elasticsearch.plugin.simple.SimplePlugin

3. Optionally, in the src/main/resources/es-plugin.properties file, a version of the plugin can be provided, as follows:

version=0.1

4. The src/main/java/org/elasticsearch/plugin/simple/SimplePlugin.java class is an example of the minimum code that needs to be compiled in order to execute a plugin:

5. package org.elasticsearch.plugin.simple;

6. import org.elasticsearch.plugins.AbstractPlugin;

7. public class SimplePlugin extends AbstractPlugin{

8. @Override

9. public String name() {

10. return "simple-plugin";

11. }

12. @Override

13. public String description() {

14. return "A simple plugin implementation";

15. }

}

16. To complete the compilation and deployment of the workflow, you need to define a src/main/assemblies/plugin.xml file used in the Maven assembly step. This file defines the resources that must be packaged into the final ZIP archive:

17.<?xml version="1.0"?>

18.<assembly>

19. <id>plugin</id>

20. <formats>

21. <format>zip</format>

22. </formats>

23. <includeBaseDirectory>false</includeBaseDirectory>

24. <dependencySets>

25. <dependencySet>

26. <outputDirectory>/</outputDirectory>

27. <useProjectArtifact>true</useProjectArtifact>

28. <useTransitiveFiltering>true</useTransitiveFiltering>

29. <excludes>

30. <exclude>org.elasticsearch:elasticsearch</exclude>

31. </excludes>

32. </dependencySet>

33. </dependencySets>

</assembly>

How it works...

Several parts comprise the development life cycle of a plugin—for example, designing, coding, building, and deploying. To speed up the building and deployment steps, common to all plugins, you need to create a Maven pom.xml file.

The previously explained pom.xml file is a standard for developing ElasticSearch plugins. This file is composed of the following parts:

· Several section entries used to set up the current Maven project. In detail, we have the following sections:

· The name of the plugin (that is, elasticsearch-simple-plugin):

<name>elasticsearch-simple-plugin</name>

· The groupId and artifactId parameters are used to define the plugin's artifact name:

· <groupId>com.packtpub</groupId>

<artifactId>simple-plugin</artifactId>

· The plugin version:

<version>0.0.1-SNAPSHOT</version>

· The type of packaging:

<packaging>jar</packaging>

· A project description with the start year:

· <description>A simple plugin for ElasticSearch</description>

<inceptionYear>2013</inceptionYear>

· An optional license section is also provided in which you can define the license for the plugin. For the standard Apache, the license should look as follows:

· <licenses>

· <license>

· <name>The Apache Software License, Version 2.0</name>

· <url>http://www.apache.org/licenses/LICENSE- 2.0.txt</url>

· <distribution>repo</distribution>

· </license>

</licenses>

· A parent POM is used to inherit common properties. Generally, for plugins, it is useful to inherit from the Sonatype POM file:

· <parent>

· <groupId>org.sonatype.oss</groupId>

· <artifactId>oss-parent</artifactId>

· <version>7</version>

</parent>

· The global variables are set. Typically, in this section, the ElasticSearch version and other library versions are set:

· <properties>

· <elasticsearch.version>1.4.0</elasticsearch.version>

</properties>

Tip

It's very important that the ElasticSearch JAR version matches the ElasticSearch cluster version in order to prevent issues that occur due to changes between releases.

· A list of dependencies is provided. In order to compile a plugin, the ElasticSearch jar and the log4j library are required during the compilation phase:

· <dependency>

· <groupId>org.elasticsearch</groupId>

· <artifactId>elasticsearch</artifactId>

· <version>${elasticsearch.version}</version>

· <scope>compile</scope>

· </dependency>

· <dependency>

· <groupId>log4j</groupId>

· <artifactId>log4j</artifactId>

· <version>1.2.17</version>

· <scope>runtime</scope>

</dependency>

· The Maven plugin section contains a list of the Maven plugins that execute several build steps, as follows:

· Compiler section: This requires a source compilation. The Java version is fixed to 1.7:

· <plugin>

· <groupId>org.apache.maven.plugins</groupId>

· <artifactId>maven-compiler-plugin</artifactId>

· <version>3.1</version>

· <configuration>

· <source>1.7</source>

· <target>1.7</target>

· </configuration>

</plugin>

· Source section: This enables the creation of source packages to be released with the binary output (useful for debugging):

· <plugin>

· <groupId>org.apache.maven.plugins</groupId>

· <artifactId>maven-source-plugin</artifactId>

· <version>2.3</version>

· <executions>

· <execution>

· <id>attach-sources</id>

· <goals>

· <goal>jar</goal>

· </goals>

· </execution>

· </executions>

</plugin>

· Assembly section: This builds a ZIP file using a configuration file (plugin.xml) and puts the output in the releases directory, as shown here:

· <plugin>

· <artifactId>maven-assembly-plugin</artifactId>

· <version>2.3</version>

· <configuration>

· <appendAssemblyId>false</appendAssemblyId>

· <outputDirectory>${project.build.directory}/releases/ </outputDirectory>

· <descriptors>

· <descriptor>${basedir}/src/main/assemblies/plugin.xml </descriptor>

· </descriptors>

· </configuration>

· <executions>

· <execution>

· <phase>package</phase>

· <goals><goal>single</goal></goals>

· </execution>

· </executions>

</plugin>

Related to pom.xml, we have the plugin.xml file that describes how to assemble the final ZIP file. This file is usually contained in the /src/main/assemblies/ directory of the project.

The following are the most important sections of this file:

· formats: In this section, the destination format is defined:

<formats><format>zip</format></formats>

· excludes: This is set in the dependencySet. It contains the artifacts to be excluded from the package. Generally, we exclude ElasticSearch jar, as it's already provided in the server installation:

· <dependencySet>

· <outputDirectory>/</outputDirectory>

· <useProjectArtifact>true</useProjectArtifact>

· <useTransitiveFiltering>true</useTransitiveFiltering>

· <excludes>

· <exclude>org.elasticsearch:elasticsearch</exclude>

· </excludes>

</dependencySet>

· includes: This is set in dependencySet. It contains the artifacts to be included in the package. These are mainly the jars required to run the plugin:

· <dependencySet>

· <outputDirectory>/</outputDirectory>

· <useProjectArtifact>true</useProjectArtifact>

· <useTransitiveFiltering>true</useTransitiveFiltering>

· <includes>… truncated …</includes>

</dependencySet>

During plugin packaging, the include and exclude rules are verified and only those files that are allowed to be distributed are put in the ZIP file. After having configured Maven, we can start to write the main plugin class. Every plugin class must be derived from theAbstractPlugin one and it must be public; otherwise it cannot be loaded dynamically from the jar:

import org.elasticsearch.plugins.AbstractPlugin;

public class SimplePlugin extends AbstractPlugin {

The AbstractPlugin class needs two methods to be defined: name and description. The name method must return a string and it's usually a short name. This value is shown in the plugin's loading log:

@Override

public String name() {

return "simple-plugin";

}

The description method must also return a string. It is mainly a long description of the plugin:

@Override

public String description() {

return "A simple plugin implementation";

}

After having defined the required files to generate a ZIP release of our plugin, it is enough to invoke the Maven package command. This command will compile the code and create a ZIP package in the target or releases directory of your project. The final ZIP file can be deployed as a plugin on your ElasticSearch cluster.

In this recipe, we have configured a working environment to build, deploy, and test plugins. In the following recipes, we will reuse this environment to develop several plugin types.

There's more…

Compiling and packaging the plugin is not enough to define a good life cycle of your plugin. For this, you need to add a test phase.Testing the plugin functionalities with test cases reduces the number of bugs that can affect the plugin when it is released.

It is possible to add a test phase in the Maven build pom.xml. In order to do this, we first need to add the required package dependencies to test ElasticSearch and Lucene. These dependencies must be added for testing:

<dependency>

<groupId>org.apache.lucene</groupId>

<artifactId>lucene-test-framework</artifactId>

<version>${lucene.version}</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.elasticsearch</groupId>

<artifactId>elasticsearch</artifactId>

<version>${elasticsearch.version}</version>

<type>test-jar</type>

<scope>test</scope>

</dependency>

The order is very important, so make sure to put lucene-test-framework at the top of your dependencies; otherwise, problems with loading and executing tests might occur.

For unit and integration testing, the ElasticSearch community mainly uses the Hamcrest library (https://code.google.com/p/hamcrest/). To use the library, you need to add its dependencies in the dependency section of the pom.xml file, as follows:

<dependency>

<groupId>org.hamcrest</groupId>

<artifactId>hamcrest-core</artifactId>

<version>1.3.RC2</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.hamcrest</groupId>

<artifactId>hamcrest-library</artifactId>

<version>1.3.RC2</version>

<scope>test</scope>

</dependency>

Note

Note that the compiling scope is test, which means that these dependencies are applicable only during the test phase.

To complete the test part, we need to add a Maven plugin that executes the tests:

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-surefire-plugin</artifactId>

<version>2.12.3</version>

<configuration>

<includes><include>**/*Tests.java</include></includes>

</configuration>

</plugin>

The includes section lists all the possible classes that contain test via the glob expression.

Creating a REST plugin

The previous recipe described how to set up a working environment and the steps required to build a native plugin. In this recipe, we will see how to create one of the most common ElasticSearch plugins: the REST plugin.

This kind of plugin allows you to extend the standard REST calls with custom ones to easily improve the capabilities of ElasticSearch.

In this recipe, we will see how to define a REST entry point and create its action. In the next recipe, we will see how to execute this action and distribute it in shards.

Getting ready

You will need a working ElasticSearch node, a Maven build tool, and an optional Java IDE. The code of this recipe is available in the chapter12/rest_plugin directory in the code bundle of the same chapter, which can be downloaded from the Packt Publishing website.

How to do it...

To create a REST entry point, we first need to create the action and then register it in the plugin. Perform the following steps:

1. Create a REST simple action (RestSimpleAction.java):

2. public class RestSimpleAction extends BaseRestHandler {

3. @Inject

4. public RestSimpleAction(Settings settings, Client client, RestController controller) {

5. super(settings, controller, client);

6. controller.registerHandler(POST, "/_simple", this);

7. controller.registerHandler(POST, "/{index}/_simple", this);

8. controller.registerHandler(POST, "/_simple/{field}", this);

9. controller.registerHandler(GET, "/_simple", this);

10. controller.registerHandler(GET, "/{index}/_simple", this);

11. controller.registerHandler(GET, "/_simple/{field}", this);

12. }

13. @Override

14. protected void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {

15. final SimpleRequest simpleRequest = new SimpleRequest(Strings.splitStringByCommaToArray (request.param("index")));

16. simpleRequest.setField(request.param("field"));

17. client.execute(SimpleAction.INSTANCE, simpleRequest, new ActionListener<SimpleResponse>() {

18. @Override

19. public void onResponse(SimpleResponse response) {

20. try {

21. XContentBuilder builder = channel.newBuilder();

22. builder.startObject();

23. builder.field("ok", true);

24. buildBroadcastShardsHeader(builder, response);

25. builder.array("terms", response.getSimple().toArray());

26. builder.endObject();

27. channel.sendResponse(new BytesRestResponse(OK, builder));

28. } catch (Exception e) {

29. onFailure(e);

30. }

31. }

32. @Override

33. public void onFailure(Throwable e) {

34. try {

35. channel.sendResponse(new BytesRestResponse(channel, e));} catch (IOException e1) {

36. logger.error("Failed to send failure response", e1);

37. }

38. }

39. });

}

40. Also, we need to register the entry point in the plugin using the following lines of code:

41. public class RestPlugin extends AbstractPlugin {

42. @Override

43. public String name() {

44. return "simple-plugin";

45. }

46. @Override

47. public String description() {

48. return "A simple plugin implementation";

49. }

50. public void onModule(RestModule module) {

51. module.addRestAction(RestSimpleAction.class);

52. }

}

How it works...

Adding a REST action is very easy. We just need to create a RestXXXAction class that handles the calls. The REST action is derived from the BaseRestHandler class and needs to implement the handleRequest method. The constructor is very important, as shown here:

@Inject

public RestSimpleAction(Settings settings, Client client, RestController controller)

The consturctor's signature is usually injected via Guice, which is a lightweight dependency injection framework and very popular in the Java ecosystem. For more details on Guice, refer to the library's home page at https://github.com/google/guice. The REST action has the following parameters:

· Settings: This can be used to load custom settings for your REST action

· Client: This will be used to communicate with the cluster (see Chapter 10, Java Integration)

· RestController: This is used to register the REST action to the controller

In the constructor of the REST action (RestController), the list of actions that must be handled registered, as follows:

controller.registerHandler(POST, "/_simple", this);

To register an action, the following parameters must be passed to the controller:

· The REST method (GET/POST/PUT/DELETE/HEAD/OPTIONS)

· The URL entry point

· The RestHandler class, usually the same class that must answer the call

After having defined the constructor, if an action is fired, the handleRequest class method is called in the following manner:

@Override

protected void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {

This method is the core of the REST action. It processes the request and sends the response back. These parameters are passed to the method:

· RestRequest: This is the REST request that hits the ElasticSearch server

· RestChannel: This is the channel used to send back the response

· Client: This is the client used to communicate in the cluster

A handleRequest method is usually composed of the following phases:

· Processing the REST request and building an inner ElasticSearch request object

· Calling the client with the ElasticSearch request

· If this is ok, processing the ElasticSearch response and building the JSON result

· If there are errors, sending the JSON error response back

In the following example, it shows how to create SimpleRequest by processing the request:

final SimpleRequest simpleRequest = new SimpleRequest(Strings.splitStringByCommaToArray (request.param("index")));

simpleRequest.setField(request.param("field"));

The request accepts a list of indices (we split the classic comma-separated list of indices via the Strings.splitStringByCommaToArray helper), and we have the field parameter, if available. We will discuss SimpleRequest thoroughly in the next recipe.

Now that we have SimpleRequest, we can send it to the cluster and get back a SimpleResponse response:

client.execute(SimpleAction.INSTANCE, simpleRequest, new ActionListener<SimpleResponse>() {

The client.execute method accepts an action, a request, and an ActionListener class that maps a future response. We can have two kinds of responses, as follows:

· onResponse: This is obtained if everything is all right

· onFailure: This is obtained if something goes wrong

The onFailure function is usually the propagation via a REST error:

@Override

public void onFailure(Throwable e) {

try {

channel.sendResponse(new BytesRestResponse(channel, e));

} catch (IOException e1) {

logger.error("Failed to send failure response", e1);

}

}

The onResponse method receives a Response object that must be converted into a JSON result:

@Override public void onResponse(SimpleResponse response)

To build the JSON response, a builder helper is used:

XContentBuilder builder = channel.newBuilder();

The builder is a standard JSON XContentBuilder, which we have already seen in Chapter 10, Java Integration. After having processed the cluster response and built the JSON, it can be sent via the following channel:

channel.sendResponse(new BytesRestResponse(OK, builder));

Obviously, if something goes wrong during JSON creation, an exception must be raised:

try {/* JSON building*/

} catch (Exception e) {

onFailure(e);

}

There's more…

To test the plugin, you can compile and assemble it with an mvn package. Then, you need to deploy the resulting unzipped file in an ElasticSearch server, in the plugins directory. After having restarted the server, the name of the plugin should pop up in the list of installed ones:

[…][INFO ][node ] [Amalgam] initializing ...

[…][INFO ][plugins ] [Amalgam] loaded [river-twitter, transport-thrift, jdbc-river, rest-plugin], sites [HQ]

If everything is all right, we can test the plugin as follows:

curl -XPOST http://127.0.0.1:9200/_simple

This is how the response will look:

{"ok":true,"_shards":{"total":15,"successful":15,"failed":0},"terms":["null_4","null_1","null_0","null_3","null_2"]}

You can also test it using the following line of code:

curl -XPOST http://127.0.0.1:9200/_simple/goofy

Here, this is how the response will look:

{"ok":true,"_shards":{"total":15,"successful":15,"failed":0},"terms":["goofy_1","goofy_2","goofy_3","goofy_4","goofy_0"]}

To fully understand the response, the next recipe will show you how the action is executed at cluster level.

See also

· You can find more information about Google Guice, used for dependency injection, at https://code.google.com/p/google-guice/

Creating a cluster action

In the previous recipe, we saw how to create a REST entry point but, to execute the action at cluster level, we need to create a cluster action.

An ElasticSearch action is generally executed and distributed in the cluster; in this recipe, we will see how to implement this kind of action. The cluster's action will be a bare minimum; we will send a string with a value for every shard, and the shards echo a resultant string, which is a concatenation of the string with the shard number.

Getting ready

You need a working ElasticSearch node, a Maven build tool, and an optional Java IDE. The code of this recipe is available in the chapter12/rest_plugin directory.

How to do it...

In this recipe, we will see that a REST call is converted to an internal cluster action. To execute an internal cluster action, the following classes are required:

· The Request and Response classes to communicate with the cluster.

· A RequestBuilder class used to execute a request to the cluster.

· An Action class used to register the action and bind it to Request, Response, and RequestBuilder.

· A Transport*Action class to bind the request and the response to ShardRequest and ShardResponse, respectively. It manages the reduce part of the query.

· The ShardRequest and ShardResponse classes to manage a shard query.

To convert a REST call into a cluster action, we will perform the following steps:

1. Write a SimpleRequest class, as follows:

2. public class SimpleRequest extends BroadcastOperationRequest<SimpleRequest> {

3. private String field;

4. SimpleRequest() {}

5. public SimpleRequest(String... indices) {

6. super(indices);

7. }

8. public void setField(String field) {

9. this.field = field;

10. }

11. public String getField() {

12. return field;

13. }

14. @Override

15. public void readFrom(StreamInput in) throws IOException {

16. super.readFrom(in);

17. field = in.readString();

18. }

19. @Override

20. public void writeTo(StreamOutput out) throws IOException {

21. super.writeTo(out);

22. out.writeString(field);

23. }

}

24. The SimpleResponse class is very similar to the SimpleRequest class. To bind the request and the response, an action (SimpleAction) is required, as follows:

25.public class SimpleAction extends ClientAction<SimpleRequest, SimpleResponse, SimpleRequestBuilder> {

26. public static final SimpleAction INSTANCE = new SimpleAction();

27. public static final String NAME = "indices/simple";private SimpleAction() {

28. super(NAME);

29. }

30. @Override

31. public SimpleResponse newResponse() {

32. return new SimpleResponse();

33. }

34. @Override

35. public SimpleRequestBuilder newRequestBuilder(Client client) {

36. return new SimpleRequestBuilder(client);

37. }

}

38. The Transport class is the core of the action. The code for this class is quite long, so we will present only the important parts, as follows:

39.public class TransportSimpleAction extends TransportBroadcastOperationAction<SimpleRequest, SimpleResponse, ShardSimpleRequest, ShardSimpleResponse> {

40. @Override

41. protected SimpleResponse newResponse(SimpleRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {

42. int successfulShards = 0;

43. int failedShards = 0;

44. List<ShardOperationFailedException> shardFailures = null;

45. Set<String> simple = new HashSet<String>();

46. for (int i = 0; i < shardsResponses.length(); i++) {

47. Object shardResponse = shardsResponses.get(i);

48. if (shardResponse == null) {

49. // a non active shard, ignore...

50. }

51. else if (shardResponse instanceof BroadcastShardOperationFailedException) {

52. failedShards++;

53. if (shardFailures == null) {

54. shardFailures = newArrayList();

55. }

56. shardFailures.add(new DefaultShardOperationFailedException(( BroadcastShardOperationFailedException) shardResponse));

57. } else {

58. successfulShards++;

59. if (shardResponse instanceof ShardSimpleResponse) {

60. ShardSimpleResponse resp = (ShardSimpleResponse) shardResponse;

61. simple.addAll(resp.getTermList());

62. }

63. }

64. }

65. return new SimpleResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, simple);

66.}

67.@Override

68.protected ShardSimpleResponse shardOperation(ShardSimpleRequest request) {

69. synchronized (simpleMutex) {

70. InternalIndexShard indexShard = (InternalIndexShard) indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId().id);

71. indexShard.store().directory();

72. Set<String> set = new HashSet<String>();

73. set.add(request.getField() + "_" + request.shardId());

74. return new ShardSimpleResponse(request.shardId(), set);

75. }

}

How it works...

As you saw, in order to execute a cluster action, the following classes are required:

· A couple of Request/Response classes to interact with the cluster

· A task action at the cluster level

· A couple of Request/Response classes to interact with the shards

· A Transport class to manage the map/reduce shard part that must be invoked by the REST call

These classes must extend one of the supported kinds of action available, as follows:

· BroadcastOperationRequest/Response: This is used for actions that must be spread across all the clusters.

· MasterNodeOperationRequest/Response: This is used for actions that must be executed only by the master node (such as index and mapping configuration). In order to get a simple acknowledgement on the master, there are AcknowledgedRequest/Response actions available.

· NodeOperationRequest: This is used for actions that must be executed by every node (that is, for all the node statistic actions).

· IndexReplicationOperationRequest: This is used for an action that must be executed at an index level (that is, deleted by query operation).

· SingleCustomOperationRequest: This is used for an action that must be executed only by a node (that is, analyze actions).

· InstanceShardOperationRequest: This is used for an action that must be executed on every shard instance (that is, bulk shard operations).

· SingleShardOperationRequest: This is used for an action that must be executed only in a shard (that is, the get action).

In our example, we defined an action that will be broadcast to every shard:

public class SimpleRequest extends BroadcastOperationRequest<SimpleRequest>

All the Request/Response classes extend a Streamable class; thus, for serializing their content, the following two methods must be provided:

· The readFrom method that reads from a StreamInput class, a class that encapsulates common input stream operations. This method allows you to deserialize the data that you transmit on the wire. In the previous example, we read a string with the following code:

· @Override

· public void readFrom(StreamInput in) throws IOException {

· super.readFrom(in);

· field = in.readString();

}

· The writeTo method writes the contents of the class to be sent via a network. The StreamOutput class provides convenient methods to process the output. In the preceding example, we have serialized a string, as follows:

· @Override

· public void writeTo(StreamOutput out) throws IOException {

· super.writeTo(out);

· out.writeString(field);

}

In both the actions, the super must be called to allow the correct serialization of the parent classes.

Tip

Every internal action in ElasticSearch is designed as a request/response pattern.

To complete the request/response action, we must define an action that binds the request to the correct response and a builder to construct it. To do so, we need to define an Action class, as follows:

public class SimpleAction extends ClientAction<SimpleRequest, SimpleResponse, SimpleRequestBuilder>

This Action object is a singleton object. We can obtain it by creating a default static instance and private constructors, as follows:

public static final SimpleAction INSTANCE = new SimpleAction();

public static final String NAME = "indices/simple";

private SimpleAction() {super(NAME);}

The static string NAME is used to uniquely identify the action at the cluster level. To complete the Action definition, the following two methods must be defined:

· The newResponse method, which is used to create a new empty response:

· @Override public SimpleResponse newResponse() {

· return new SimpleResponse();

}

· The newRequestBuilder method, which is used to return a new request builder for the current action type:

· @Override

· public SimpleRequestBuilder newRequestBuilder(Client client) {

· return new SimpleRequestBuilder(client);

}

When the action is executed, the request and the response are serialized and sent to the cluster. To execute your custom code at the cluster level, a Transport action is required. Transport actions are usually defined as map and reduce jobs. The map part consists of executing the action on several shards (via the ShardRequest and ShardResponse methods), and the reduce part consists of collecting all the results from the shards in a response that must be sent back to the requester.

The Transport action is a long class with many methods, but the most important ones are the ShardOperation (the map part) and newResponse (the reduce part) methods. The original request is converted into a distributed ShardRequest method that is processed by theshardOperation method:

@Override protected ShardSimpleResponse shardOperation(ShardSimpleRequest request){

It is a good design principle to execute the shard operation using a lock to prevent the problem of concurrency:

synchronized (simpleMutex) {…}

To obtain the internal shard, we need to ask the IndexService method to return a shard based on a required index. The shard request contains the index and the ID of the shard that must be used to execute the action:

InternalIndexShard indexShard = (InternalIndexShard) indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId().id());

The InternalIndexShard object allows you to execute every possible shard operation (search, get, index, and many others). In this method, you can execute every data shard manipulation that you want.

Tip

Custom shard actions can execute an application's business operation in a distributed and fast way.

In the following example, we have created a simple set of values:

Set<String> set = new HashSet<String>();

set.add(request.getField() + "_" + request.shardId());

The final step of our shard operation is to create a response to be sent back to the reduce step. In order to create the shard response, we need to return the result plus information about the index and the shard that executed the action:

return new ShardSimpleResponse(request.shardId(), set);

The distributed shard operations are collected in the reduce step (the newResponse method). This step aggregates all the shard results and sends back the result to the original action:

@Override protected SimpleResponse newResponse(SimpleRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState){

Apart from the result, we also need to collect information about the shard's execution (if there are failed shard executions). This information is usually collected in three values: successfulShards, failedShards, and shardFailures:

int successfulShards = 0;

int failedShards = 0;

List<ShardOperationFailedException> shardFailures = null;

The request result is a set of collected strings, as shown here:

Set<String> simple = new HashSet<String>();

To collect the results, we need to iterate over the shard responses:

for (int i = 0; i < shardsResponses.length(); i++) {

Object shardResponse = shardsResponses.get(i);

We need to skip the null shardResponse, mainly due to inactive shards:

if (shardResponse == null) {}

If an exception is raised, we also need to collect information about them to inform the caller:

else if (shardResponse instanceof BroadcastShardOperationFailedException) {

failedShards++;

if (shardFailures == null) {

shardFailures = newArrayList();

}

shardFailures.add(new

efaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));

At last, we can aggregate the valid results:

} else {

successfulShards++;

if (shardResponse instanceof ShardSimpleResponse) {

ShardSimpleResponse resp = (ShardSimpleResponse) shardResponse;

simple.addAll(resp.getTermList());

}

}

The final step is to create the response, collected during the previous result, and check the response status using the following code:

return new SimpleResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, simple);

Creating a cluster action is required when there are low-level operations that you want to execute very fast, such as a special facet or a complex manipulation. These operations require too principle ElasticSearch calls to be executed, but these can be easily written as a cluster action.

See also

· The Creating a REST plugin recipe in this chapter

Creating an analyzer plugin

ElasticSearch provides, out-of-the-box, a large set of analyzers and tokenizers to cover general needs. Sometimes, we need to extend the capabilities of ElasticSearch to add new analyzers. Typically, you need to create an analyzer plugin when you need to do the following:

· Adding standard Lucene analyzers/tokenizers, which are not provided by ElasticSearch

· Integrating third party analyzers

· Adding a custom analyzer

In this recipe, we will add a new custom English analyzer similar to the one provided by ElasticSearch.

Getting ready

You will need a working ElasticSearch node, a Maven build tool, and optionally a Java IDE. The code of this recipe is available in the chapter12/analysis_plugin directory.

How to do it...

An analyzer plugin is generally composed of the following three classes:

· A plugin class that registers the BinderProcessor class

· A BinderProcessor class that registers one or more AnalyzerProviders class

· An AnalyzerProviders class that provides an analyzer plugin

To create an analyzer plugin, perform the following steps:

1. The plugin class is the same as the one used in the previous recipes, plus a binder registration method:

2. @Override

3. public void processModule(Module module){

4. if (module instanceof AnalysisModule){

5. AnalysisModule analysisModule = (AnalysisModule) module;

6. analysisModule.addProcessor(new CustomEnglishBinderProcessor());

7. }

}

8. The BinderProcess method registers the analysis module and one or more analyzer providers:

9. public class CustomEnglishBinderProcessor extends AnalysisModule.AnalysisBinderProcessor {

10. @Override

11.public void processAnalyzers(AnalyzersBindings analyzersBindings){

12. analyzersBindings.processAnalyzer(CustomEnglishAnalyzerProv ider.NAME, CustomEnglishAnalyzerProvider.class);

13. }

}

14. The analyzer provider class initializes our analyzer by passing the parameters provided in the settings:

15.import org.apache.lucene.analysis.en.EnglishAnalyzer;

16.import org.apache.lucene.analysis.util.CharArraySet;

17.import org.elasticsearch.common.inject.Inject;

18.import org.elasticsearch.common.inject.assistedinject.Assisted;

19.import org.elasticsearch.common.settings.Settings;

20.import org.elasticsearch.env.Environment;

21.import org.elasticsearch.index.Index;

22.import org.elasticsearch.index.settings.IndexSettings;

23.public class CustomEnglishAnalyzerProvider extends AbstractIndexAnalyzerProvider<EnglishAnalyzer>{

24. public static String NAME="custom_english";

25. private final EnglishAnalyzer analyzer;

26. @Inject

27. public CustomEnglishAnalyzerProvider(Index index, @IndexSettings Settings indexSettings, Environment env, @Assisted String name, @Assisted Settings settings){

28. super(index, indexSettings, name, settings);

29. analyzer = new EnglishAnalyzer(version,

30. Analysis.parseStopWords(env, settings, EnglishAnalyzer.getDefaultStopSet(), version),

31. Analysis.parseStemExclusion(settings, CharArraySet.EMPTY_SET, version));

32. }

33. @Override

34. public EnglishAnalyzer get(){

35. return this.analyzer;

36. }

}

After having built the plugin and installed it on the ElasticSearch server, our analyzer is accessible just like any native ElasticSearch analyzer.

How it works...

Creating an analyzer plugin is quite simple. This is the general workflow:

1. Wrap the analyzer initialization in a provider.

2. Register the analyzer provider in the binder so that the analyzer is accessible via the analysis module level.

3. Register the binder in the plugin.

In the previous example, we registered a CustomEnglishAnalyzerProvider class that extends EnglishAnalyzer:

public class CustomEnglishAnalyzerProvider extends AbstractIndexAnalyzerProvider<EnglishAnalyzer>

We need to provide a name to the analyzer:

public static String NAME="custom_english";

We instantiate a private scope Lucene analyzer, provided on request with the get method:

private final EnglishAnalyzer analyzer;

The CustomEnglishAnalyzerProvider constructor can be injected via Google Guice with settings that can be used to provide cluster defaults via index settings or elasticsearch.yml:

@Inject

public CustomEnglishAnalyzerProvider(Index index, @IndexSettings Settings indexSettings, Environment env, @Assisted String name, @Assisted Settings settings){

For our analyzer to work correctly, we need to set up the parent constructor via the super call:

super(index, indexSettings, name, settings);

Now, we can initialize the internal analyzer that must be returned by the get method:

analyzer = new EnglishAnalyzer(version,Analysis.parseStopWords(env, settings, EnglishAnalyzer.getDefaultStopSet(), version),

Analysis.parseStemExclusion(settings, CharArraySet.EMPTY_SET, version));

This analyzer accepts the following:

· The Lucene version

· A list of stopwords that can be loaded by setting them or set by the default ones

· A list of words that must be excluded by the stemming step

After having created a provider for our analyzer, we need to create another class CustomEnglishBinderProcessor that registers our provider in the analyzer module:

public class CustomEnglishBinderProcessor extends AnalysisModule.AnalysisBinderProcessor{

To register our analyzer in the binder, we need to override the processAnalyzers method. Then, we add our analyzer by defining the name (referred in the REST calls) and the class of our provider:

@Override public void processAnalyzers(AnalyzersBindings analyzersBindings){

analyzersBindings.processAnalyzer(CustomEnglishAnalyzerProvider. NAME, CustomEnglishAnalyzerProvider.class);

}

}

Finally, we need to register our binding in the plugin, hooking with processModule to check whether the module is an AnalysisModule:

@Override

public void processModule(Module module){

if (module instanceof AnalysisModule){

The analysis module allows you to register one or more bind processors that will be initialized during the analysis module service initialization via the addProcessor method:

AnalysisModule analysisModule = (AnalysisModule) module;

analysisModule.addProcessor(new CustomEnglishBinderProcessor());

Creating a river plugin

In Chapter 8, Rivers, we saw how powerful the river plugins are. They allow you to populate an ElasticSearch cluster from different sources (DBMS, NoSQL system, streams, and so on). Creating a custom river is necessary if you need to do the following:

· Add a new NoSQL data source that is not supported by the already existing plugins

· Add a new stream type

· Add a custom business logic to import data in ElasticSearch, such as field modification, data aggregation, and, in general, a data brewery

In this recipe, we will implement a simple river that generates documents with a field that contains an incremental value and ingests them in ElasticSearch.

Getting ready

You will need a working ElasticSearch node, a Maven build tool, and optionally a Java IDE. The code of this recipe is available in the chapter12/river_plugin directory.

How to do it...

To create a river plugin, we need the following three classes at least:

· The plugin that registers a river module

· A river module that registers our river

· The river that executes our business logic

Perform the following steps to create a river plugin:

1. This part of the plugin class is similar to the previous one:

2. …

3. public void onModule(RiversModule module){

4. module.registerRiver("simple", SimpleRiverModule.class);

5. }

(The common plugin part is omitted, as it is similar to the previous one.)

6. The river module registers the river class as a singleton:

7. public class SimpleRiverModule extends AbstractModule{

8. @Override

9. protected void configure(){

10. bind(River.class).to(SimpleRiver.class).asEagerSingleton();

11. }

}

12. Now, we can write the river core. This code section is very long, so I have split it into several parts, as follows:

· This is the code for the class definition:

· … truncated …

· public class SimpleRiver extends AbstractRiverComponent implements River {

… truncated …

· The following code is the constructor definition, in which you set up the river and collect user settings;

· @SuppressWarnings({"unchecked"})

· @Inject

· Public SimpleRiver(RiverName riverName, RiverSettings settings, Client client, ThreadPool threadPool) {

· super(riverName, settings);

· this.client = client;

· if (settings.settings().containsKey("simple")) {

· Map<String, Object> simpleSettings = (Map<String, Object>) settings.settings().get("simple");

· simpleNumber = XContentMapValues.nodeIntegerValue( simpleSettings.get("number"), 100);

· fieldName = XContentMapValues.nodeStringValue( simpleSettings.get("field"), "test");

· poll = XContentMapValues.nodeTimeValue( simpleSettings.get("poll"), TimeValue.timeValueMinutes(60));

· }

· logger.info("creating simple stream river for [{} numbers] with field [{}]", simpleNumber, fieldName);

· if (settings.settings().containsKey("index")) {

· Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get("index");

· indexName = XContentMapValues.nodeStringValue( indexSettings.get("index"), riverName.name());

· typeName = XContentMapValues.nodeStringValue( indexSettings.get("type"), "simple_type");

· bulkSize = XContentMapValues.nodeIntegerValue(indexSettings.get("bulk_size"), 100);

· bulkThreshold = XContentMapValues.nodeIntegerValue( indexSettings.get("bulk_threshold"), 10);

· } else {

· indexName = riverName.name();

· typeName = "simple_type";

· bulkSize = 100;

· bulkThreshold = 10;

· }

}

· This is the code for the start function that manages the start of the river and initializes the bulk processor:

· @Override

· public void start(){

· logger.info("starting simple stream");

· bulkProcessor = BulkProcessor.builder(client,new BulkProcessor.Listener() {…truncated… }).setBulkActions(bulkSize). setFlushInterval(TimeValue.timeValueMinutes(5)).setConcurrentRequests(bulkThreshold).build();

· thread = EsExecutors.daemonThreadFactory( settings.globalSettings(), "Simple processor").newThread(new SimpleConnector());

· .start();

}

· The following code is used for the close function, which cleans up internal states before exiting:

· @Override

· public void close() {

· logger.info("closing simple stream river");

· bulkProcessor.close();

· this.closed = true;

· thread.interrupt();

}

· This code shows a wait function used to reduce the throughtput:

· private void delay() {

· if (poll.millis() > 0L) {

· logger.info("next run waiting for {}", poll);

· try {

· Thread.sleep(poll.millis());

· } catch (InterruptedException e) {

· logger.error("Error during waiting.", e, (Object) null);

· }

· }

}

· This is the code for a producer class that yields the item to be executed in bulk:

· private class SimpleConnector implements Runnable {

· @Override

· public void run() {

· while (!closed) {

· try {

· for (int i = 0; i < simpleNumber; i++) {

· XContentBuilder builder = XContentFactory.jsonBuilder();

· builder.startObject();

· builder.field(fieldName, i);

· builder.endObject();

· bulkProcessor.add(Requests.indexRequest(indexName) .type(typeName).id(UUID.randomUUID().toString()) .create(true).source(builder));

· }

· //in this case we force the bulking, but it should seldom be done

· bulkProcessor.flush();

· delay();

· } catch (Exception e) {

· logger.error(e.getMessage(), e, (Object) null);

· closed = true;

· }

· if (closed) {

· return;

· }

· }

· }

}

13. After having deployed the river plugin in an ElasticSearch cluster, we can activate it with a similar call, as shown here:

14.curl -XPUT localhost:9200/_river/simple_river/_meta -d '

15.{

16. "type" : "simple",

17. "simple" : {

18. "field" : "myfield",

19. "number" : 1000

20. },

21. "index" : {

22. "index" : "simple_data",

23. "type" : "simple_type",

24. "bulk_size" : 10,

25. "bulk_threshold" : 50

26. }

}

How it works...

The river core is quite long but it covers a lot of interesting parts that are useful not only for the river, as follows:

· Processing the settings passed to a river

· Initializing a thread that populates the data (consumer) and its status management

· Executing a safe bulk index

A custom river class must extend the AbstractRiverComponent class and implement the interfaces defined in the River interface:

public class SimpleRiver extends AbstractRiverComponent implements River {

The river constructor accepts the following parameters:

· The RiverName object that contains the name defined in the /_river/<river_name>/_meta call.

· The river settings that are the settings passed as JSON.

· A client to send/receive data. For example, the native client of the previous chapter.

· A thread pool to control the thread allocation:

· @Inject

public SimpleRiver(RiverName riverName, RiverSettings settings, Client client, ThreadPool threadPool) {

We need to pass the river's name and settings to the parent constructor in order to initialize it:

super(riverName, settings);

Then, we need to store the client for future bulk operations:

this.client = client;

Now, we can check whether our river settings are available (the simple section in JSON):

if (settings.settings().containsKey("simple")) {

Next, we can extract the number of items to be created and populate the fields:

Map<String, Object> simpleSettings = (Map<String, Object>) settings.settings().get("simple");

simpleNumber = XContentMapValues.nodeIntegerValue(simpleSettings.get("number"), 100);

fieldName = XContentMapValues.nodeStringValue(simpleSettings.get("field"), "test");

}

The ElasticSearch content parser gives you a lot of useful functionalities to pass this kind of data. Usually, some index settings are specified to define the index that must be used to store the data, the type that must be used, and the parameters to control the following bulk operation:

if (settings.settings().containsKey("index")) {

Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get("index");

indexName = XContentMapValues.nodeStringValue(indexSettings .get("index"), riverName.name());

typeName = XContentMapValues.nodeStringValue(indexSettings .get("type"), "simple_type");

bulkSize = XContentMapValues.nodeIntegerValue(indexSettings .get("bulk_size"), 100);

bulkThreshold = XContentMapValues.nodeIntegerValue(indexSettings .get("bulk_threshold"), 10);

It is good practice to provide default index names if they are not provided, as follows:

indexName = riverName.name();

typeName = "simple_type";

bulkSize = 100;

bulkThreshold = 10;

A river is internally seen as a service, so we need to provide the start and close methods. The start method initializes a bulk processor and starts the producer thread called SimpleConnector:

@Override

public void start() {

logger.info("starting simple stream");

bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener()

{}) .setBulkActions(bulkSize).setFlushInterval(TimeValue .timeValueMinutes(5)).setConcurrentRequests(bulkThreshold).build();

thread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "Simple processor").newThread(new SimpleConnector());

thread.start();

}

The BulkProcessor APIs are convenient APIs introduced in the latest ElasticSearch versions to manage bulk jobs. They allow you to define the following:

· The maximum number of bulk actions via setBulkActions

· A concurrent bulk limit via setConcurrentRequests

· A flush interval via setFlushInterval

The close method usually sets the status to closed and stops the producer thread:

@Override

public void close() {

logger.info("closing simple stream river");

bulkProcessor.close(); // it must be closed to flush the contained actions

this.closed = true;

thread.interrupt();

}

In the preceding code, a delay method is present; it is used to delay the producer thread in order to prevent the overloading of the ElasticSearch cluster. The plugin is generally composed of a producer thread, which produces data to be indexed, and a consumer thread (in this case, we have simplified this to a single bulk function), which consumes the data in bulk actions.

The core of the river is the producer thread that generates index actions to be executed in bulk. This object is a thread and implements the methods of the Runnable class:

private class SimpleConnector implements Runnable {

Obviously, the main method of this class is run:

@Override

public void run() {

When executing the run part in the thread, check whether the thread is active or closed (stopped):

while (!closed) {

The main part of the run method generates documents with the builder (as we have seen in the previous chapter) and then adds them to the bulk processor.

There's more…

Creating a river for the first time can be a bit long and complicated, but the base skeleton is reusable (it changes very little from river to river). While developing a river, the maximum time is spent in designing and parsing the settings and in developing the runfunction of the producer thread. The others parts are often reused in a lot of rivers.

If you want to improve your knowledge of how to write rivers, some good examples are available on GitHub, and we have already seen some of them in Chapter 8, Rivers.

See also

· To learn more about rivers, see Chapter 8, Rivers

· The official ElasticSearch page that lists the most common rivers at http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-plugins.html#riverS