Rivers - ElasticSearch Cookbook, Second Edition (2015)

ElasticSearch Cookbook, Second Edition (2015)

Chapter 8. Rivers

In this chapter, we will cover the following recipes:

· Managing a river

· Using the CouchDB river

· Using the MongoDB river

· Using the RabbitMQ river

· Using the JDBC river

· Using the Twitter river

Introduction

There are two ways to put your data into ElasticSearch. As you have seen in previous chapters, one way is to use the index API, which allows you to store documents in ElasticSearch via the PUT/POST API or the bulk API. The other way is to use a service that fetches the data from an external source (at one time or periodically) and puts it in the cluster.

ElasticSearch calls these services rivers, and the ElasticSearch community provides several rivers to connect to different data sources, as follows:

· CouchDB

· MongoDB

· RabbitMQ

· SQL DBMS (Oracle, MySQL, PostgreSQL, and so on)

· Redis

· Twitter

· Wikipedia

Rivers are available as external plugins at http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-plugins.html#river, which provides an updated list of the available ElasticSearch rivers. In this chapter, we'll discuss how to manage a river (create, check, and delete) and how to configure the most common ones.

A river is a very handy tool to ingest data in ElasticSearch, but it has its advantages and disadvantages.

These are the main advantages of rivers:

· A built-in main functionality in the ElasticSearch core (although in the future, it might be removed)

· Autorestart is managed by ElasticSearch in the event of a cluster startup or migration to another node in case of a node failure

· Easily deployable as an ElasticSearch plugin

These are the main disadvantages of rivers:

· Failures or a malfunction in a river can cause the node or cluster to hang

· There is no river balancer, so some nodes can have a high overhead due to the execution of a river, reducing overall performance

· An update to a river requires a cluster restart

· It's very difficult to debug a river in a multiple node environment

The river system is a good tool to use for prototyping functionalities, but due to its issues, it can lead to cluster instability. A best practice is to execute data ingestion in a separate application. This approach is used, for example, in logstash, the log data ingestion system of ElasticSearch (http://www.elasticsearch.org/overview/logstash and http://logstash.net/).

Managing a river

In ElasticSearch, there are two main action-related river setups: creating a river and deleting a river.

Getting ready

You will need a working ElasticSearch cluster.

How to do it...

A river is uniquely defined by a name and type. The river type is the type name defined in the loaded river plugins. After the name and type parameters, a river usually requires an extra configuration that can be passed in the _meta property.

In order to manage a river, perform the following steps:

1. To create a river (my_river), the HTTP method used is PUT (POST will work, too), as shown here:

2. curl -XPUT 'http://127.0.0.1:9200/_river/my_river/_meta' -d '{

3. "type" : "dummy"

4. }'

Tip

The dummy type is a fake river, which is always installed in ElasticSearch.

5. This is how the result will look:

{"created":true,"_index":"_river","_type":"my_river","_id":"_meta","_version":1}

6. If you take a look at the ElasticSearch logs, you'll see some new lines, as shown here:

7. [2014-07-27 12:20:55,518][INFO ][cluster.metadata ] [White Pilgrim] [_river] creating index, cause [auto(index api)], shards [1]/[1], mappings []

8. [2014-07-27 12:20:55,557][INFO ][cluster.metadata ] [White Pilgrim] [_river] update_mapping [my_river] (dynamic)

9. [2014-07-27 12:20:56,569][INFO ][river.dummy ] [White Pilgrim] [dummy][my_river] create

10.[2014-07-27 12:20:56,569][INFO ][river.dummy ] [White Pilgrim] [dummy][my_river] start

[2014-07-27 12:20:56,582][INFO ][cluster.metadata ] [White Pilgrim] [_river] update_mapping [my_river] (dynamic)

11. To remove a river, use the DELETE HTTP method. If you consider the previously created river, this should be the REST call:

curl -XDELETE 'http://127.0.0.1:9200/_river/my_river/'

The following will be the result:

{"acknowledged":true}

12. If you take a look at the ElasticSearch logs, you'll see some new lines, as follows:

13.[2014-07-27 12:22:04,464][INFO ][cluster.metadata ] [White Pilgrim] [[_river]] remove_mapping [[my_river]]

[2014-07-27 12:22:04,466][INFO ][river.dummy ] [White Pilgrim] [dummy][my_river] close

How it works...

During an ElasticSearch node startup, the river service is automatically activated.

Depending on the river implementation, there are two different usages: one shot and periodically. In one shot usage, the river is created with some parameters. It executes its process and then it removes itself once the process is complete. This approach is mainly used to process files, dumps, and every source that needs to be processed only once, as the data in it does not change. In periodical usage, the river waits for a time interval after it has processed all the data and then restarts processing new data, if available. This case is typical of data sources that are updated frequently such as DBMS, MongoDB, RabbitMQ, and Redis.

Rivers are stored in a special index, _river (In ElasticSearch, all special indices start with the _ character.). The document type, name, becomes the river name, and the _meta document is located at the place where the river configuration is stored.

The _river index is automatically replicated in every cluster node.

When ElasticSearch receives a create river call, it creates the new river mapping and starts the river. Generally, the river is composed of the following components:

· Producer threads: These collect the documents to be indexed and send them to a consumer (thread)

· Consumer threads: These execute the bulk insertion of documents sent by the producer

When the river is started, these threads are also started, and the data is processed and sent to the cluster.

In our example, we can see that a river is started by taking a look at the ElasticSearch logfiles.

When you want to remove a river, the DELETE call removes it from the execution. At the server level, ElasticSearch stops the river, flushes the stale data, and removes it from the _river index.

ElasticSearch always guarantees that a single river instance is running in the cluster (singleton). If the river is executed on a node and if this node should die, the river is rescheduled to work on another cluster node. It's the application logic of the river that keeps track of the river status and does not reprocess the same data.

There's more…

When a river is executing, a special document _status is available under the river name. This is a standard ElasticSearch document that can be fetched with the GET API.

For the preceding example, it's possible to check the status with this code:

curl -XGET 'http://127.0.0.1:9200/_river/my_river/_status'

The answer will be similar to the following:

{

"_id": "_status",

"_index": "_river",

"_source": {

"node": {

"id": "I_mWzO-tRHWG-DpQOFuw4w",

"name": " White Pilgrim",

"transport_address": "inet[/127.0.0.1:9300]"

}

},

"_type": "my_river",

"_version": 1,

"found": true

}

In the _source function, the node attribute defines in which node the river is in execution. The status can also contain special river fields, describing the current river position in the process (for example, the number of documents processed, the last river cycle, and so on).

See also

· The Installing a plugin manually recipe in Chapter 2, Downloading and Setting Up

Using the CouchDB river

CouchDB is a NoSQL data store that stores data in the JSON format, similar to ElasticSearch. It can be queried with the map/reduce task and provides the REST API, so every operation can be done via HTTP API calls.

Using ElasticSearch to index and search CouchDB data is very handy, as it extends the CouchDB data store with Lucene's search capabilities.

Getting ready

You will need a working ElasticSearch cluster and a working CouchDB server to connect to.

How to do it...

In order to use the CouchDB river, perform the following steps:

1. First, you need to install the CouchDB river plugin, which is available on GitHub (https://github.com/elasticsearch/elasticsearch-river-couchdb), and is maintained by the ElasticSearch company. You can install the river plugin in the usual way:

2. bin/plugin -install elasticsearch/elasticsearch-river-couchdb/2.0.0

Tip

Internally, the CouchDB river plugin uses the attachment plugin and JavaScript plugins, and it is a good practice to install them.

3. After a node restart, you can create a configuration (config.json) for your CouchDB river:

4. {

5. "type": "couchdb",

6. "couchdb": {

7. "host": "localhost",

8. "port": 5984,

9. "db": "my_db",

10. "filter": null

11. },

12. "index": {

13. "index": "my_db",

14. "type": "my_db",

15. "bulk_size": "100",

16. "bulk_timeout": "10ms"

17. }

}

18. Now, create the river with this configuration:

19.curl -XPUT 'http://127.0.0.1:9200/_river/couchriver/_meta' -d @config.json

20. This is how the result will look:

{"_index":"_river","_type":" couchriver ","_id":"_meta","_version":1, "created":true }

How it works...

The CouchDB river is designed to be fast at detecting changes and propagating them from CouchDB to ElasticSearch. It is designed to hook the _changes feed of CouchDB so that it does not create any overhead in polling the server and consumes as less resources as possible.

This approach prevents the execution of a lot of map/reduce queries on CouchDB to retrieve the new or changed documents.

To create a CouchDB river, the type must be set to couchdb. The following parameters must be passed to the couchdb object:

· protocol (by default, http; the valid values are http and https): This parameter defines the protocol to be used.

· no_verify (by default, false): If this parameter is set to true, the river will skip the HTTPS certificate validation.

· host (by default, localhost): This parameter defines the host server to be used.

· port (by default, 5984) : This parameter defines the CouchDB port number.

· heartbeat (by default, 10s) and read_timeout (by default, 30s): These parameters are used to control the HTTP connection timeouts.

· db (by default, the river name): This parameter defines the name of the database that is to be monitored.

· filter: This parameter defines some filters that can be applied to remove unwanted documents.

· filter_params: This parameter defines a list of keys/values used to filter out documents.

· ignore_attachments (by default, false): If this parameter is true, the document that contains the attachment will be skipped. It requires the Attachment plugin installed.

· user and password: These parameters, if defined, are used to authenticate the user to CouchDB.

· script: This is an optional script to be executed on documents.

The CouchDB river also provides a good tuning on indexing by letting the user configure several index parameters in the index object, as follows:

· index: This parameter is the index name to be used

· type: This parameter is the type to be used

· bulk_size (by default, 100): This parameter is the number of bulk items to be collected before sending them as bulk

· bulk_timeout (by default, 10 milliseconds): If changes are detected within the bulk_timeout time, they are packed up to bulk_size before being sent

· max_concurrent_bulk (by default, 1): This parameter controls the count of the concurrent bulk requests that are to be executed

When the river starts, it initializes two threads:

· Slurper thread: This manages the connection between ElasticSearch and the CouchDB server. It continuously fetches the changes in CouchDB and puts them in a queue to be read by the indexer. Generally, this thread is called a producer.

· Indexer thread: This collects items from the queue and prepares the bulk to be indexed. It is often referred to as a consumer.

There's more…

The CouchDB river is very fast and well-designed. It has two important tools to improve the quality of your ingested documents: filter and script.

The filter, if applicable, allows you to filter documents in CouchDB's _change stream, reducing the bandwidth and the documents that must be indexed. The filter can also be used to partition your CouchDB database. For example, it allows you to create rivers that use one index per user to store documents.

The script allows document manipulation before indexing them. Typical scenarios cover adding/cloning/editing/joining fields, but other document manipulations are available, which are limited only by the capabilities of the chosen scripting language.

See also

· The CouchDB river plugin's home page at https://github.com/elasticsearch/elasticsearch-river-couchdb

Using the MongoDB river

MongoDB is a very common NoSQL data storage system used all over the world. One of the main things that it lacks is that it was not designed for text searching.

Although the latest MongoDB version provides full-text search, its completeness and functionality are, by far, more limited than the current ElasticSearch version. So, it's quite common to use MongoDB as the data store and ElasticSearch to search. The MongoDB river, initially developed by me and now maintained by Richard Louapre, helps to create a bridge between these two applications.

Getting ready

You will need a working ElasticSearch cluster and a working MongoDB instance installed on the same machine as ElasticSearch, with replica sets enabled (http://docs.mongodb.org/manual/tutorial/deploy-replica-set/ andhttp://docs.mongodb.org/manual/tutorial/convert-standalone-to-replica-set/). You need to restore the sample data available in mongodb/data with the following command:

mongorestore –d escookbook escookbook

How to do it...

To use the MongoDB river, perform the following steps:

1. First, install the MongoDB river plugin, which is available on GitHub (https://github.com/richardwilly98/elasticsearch-river-mongodb). You can install the river plugin in the usual way:

2. bin/plugin -install richardwilly98/elasticsearch-river-mongodb

Tip

As the internal MongoDB river plugin uses the ElasticSearch attachment plugin (if you need to import documents from GridFS) and sometimes the JavaScript scripting language (if you want to use filtering), it is a good practice to install these scripting languages.

3. Restart your ElasticSearch node to make sure the river plugin is loaded. In the log, this is what you should see:

[2014-08-04 15:39:29,705][INFO ][plugins ] [Dirtnap] loaded [river-twitter, transport-thrift, river-mongodb, mapper-attachments, lang-python, lang-javascript], sites [bigdesk, head]

4. You need to create a config.json file to be used to configure the river. In our case, we define a database and a collection to fetch the data:

5. {

6. "type" : "mongodb",

7. "mongodb" : {

8. "servers" : [

9. { "host" : "localhost", "port" : 27017 }

10. ],

11. "db" : "escookbook",

12. "collection" : "items"

13. },

14. "index" : {

15. "name" : "items"

16. }

}

17. Now, create the river with the current configuration:

18.curl -XPUT 'http://127.0.0.1:9200/_river/mongodbriver/_meta' -d @config.json

19. This is how the result will look:

{"_index":"_river","_type":"mongodbriver","_id":"_meta","_version":1, "created":true}

How it works...

MongoDB fetches data from a MongoDB instance and puts it in the current cluster. It's important that the MongoDB instance be correctly configured in the replica set, as the river works on the oplog collection. The oplog (operations log) collection is a special collection that is used to keep track of every MongoDB change. The river interprets the log actions and replicates them in ElasticSearch. Using this approach, it's not necessary to continue polling the MongoDB cluster and searches that can significantly reduce the performance are not required.

Tip

The actual implementation of the MongoDB river is also compatible with TokuMX (https://github.com/Tokutek/mongo), a high performance distribution of MongoDB.

The ElasticSearch configuration used in the preceding example is quite simple. There are two main sections in the configuration:

· Mongodb: This contains the MongoDB-related parameters. These are the most important ones:

· servers: This is a list of hosts and ports to connect to.

· credentials: This is a list of database credentials (db, user, and password). Take the following code as an example:

{"db":"mydatabase", "user":"username", "password":"myseceret"}

· db: This defines the database to be monitored.

· collection: This defines the collection to be monitored.

· gridfs: This defines a Boolean that indicates whether the collection is GridFS.

· filter: This defines an extra filter to filter out records.

· index: This defines the index where the documents have to be stored in ElasticSearch. The most important parameters that can be passed are as follows:

name: This is the index name to be used

type: This is the type to be used

If no mappings are defined, the river will autodetect the format from the MongoDB document.

Note

One of the main advantages of using this plugin is that because it works on oplog, it always keeps the data updated without the MongoDB overhead.

See also

· The MongoDB river plugin's home page at https://github.com/richardwilly98/elasticsearch-river-mongodb

Using the RabbitMQ river

RabbitMQ is a fast message broker that can handle thousands of messages per second. It can be very handy to use in conjunction with ElasticSearch in order to bulk index the records.

The RabbitMQ river plugin is designed for waiting messages that contain a list of bulk operations. When a new message is delivered to RabbitMQ, it's delivered to ElasticSearch via the plugin to be executed.

Getting ready

You will need a working ElasticSearch cluster and a working RabbitMQ instance installed on the same machine as ElasticSearch.

How to do it...

In order to use the RabbitMQ river, perform the following steps:

1. First, you need to install the RabbitMQ river plugin, which is available on GitHub (https://github.com/elasticsearch/elasticsearch-river-rabbitmq). You can install the river plugin in the usual way:

2. bin/plugin -install elasticsearch/elasticsearch-river-rabbitmq/2.3.0

3. This is how the result should look:

4. -> Installing elasticsearch/elasticsearch-river-rabbitmq/2.3.0...

5. Trying http://download.elasticsearch.org/elasticsearch/elasticsearch-river-rabbitmq/elasticsearch-river-rabbitmq-2.3.0.zip...

6. Downloading ....................DONE

Installed river-rabbitmq

7. Restart your ElasticSearch node to ensure that the river plugin is loaded. In the log, you should see the following code:

[2013-10-14 23:08:43,639][INFO ][plugins ] [Fault Zone] loaded [river-rabbitmq, river-twitter, transport-thrift, river-mongodb, mapper-attachments, lang-python, river-couchdb, lang-javascript], sites [bigdesk, head]

8. You need to create a configuration file (.json) to be used to configure the river:

9. {

10. "type" : "rabbitmq",

11. "rabbitmq" : {

12. "host" : "localhost",

13. "port" : 5672,

14. "user" : "guest",

15. "pass" : "guest",

16. "vhost" : "/",

17. "queue" : "elasticsearch",

18. "exchange" : "elasticsearch",

19. "routing_key" : "elasticsearch",

20. "exchange_declare" : true,

21. "exchange_type" : "direct",

22. "exchange_durable" : true,

23. "queue_declare" : true,

24. "queue_bind" : true,

25. "queue_durable" : true,

26. "queue_auto_delete" : false,

27. "heartbeat" : "30m",

28. "nack_errors" : "true"

29. },

30. "index" : {

31. "bulk_size" : 100,

32. "bulk_timeout" : "10ms",

33. "ordered" : false,

34. "replication" : "default"

35. }

}

36. Now, create the river with the current configuration:

37.curl -XPUT 'http://127.0.0.1:9200/_river/rabbitriver/_meta' -d @config.json

38. This will be the result:

{"_index":"_river","_type":" rabbitriver ","_id":"_meta","_version":1, "created":true }

How it works...

The RabbitMQ river instantiates a connection to the RabbitMQ server and waits for the messages to finish processing. The only kind of messages that the plugin is able to process are bulk operation messages.

Note

Every bulk operation must terminate with a new line \n; otherwise, the last operation is of little depth.

Typically, the connection between RabbitMQ and ElasticSearch is a direct connection, which means that as the message is sent to the RabbitMQ server, it is redirected to ElasticSearch.

The river type is rabbitmq, and all client configurations live on the rabbitmq object. These are the most common parameters for the RabbitMQ river:

· host (by default, localhost): This defines the RabbitMQ server's address.

· port (by default, 5672): This defines the RabbitMQ server's port.

· user and pass: These define the user and password credentials required to access the RabbitMQ server, respectively.

· vhost (by default, /): This defines the RabbitMQ virtual host to be used.

· exchange_declare (false/true) and exchange (by default, elasticsearch): These control whether the exchange must be bound and the exchange object name, respectively.

· exchange_type (by default, direct): This defines the type of exchange to be used.

· exchange_durable (by default, true): This defines a durable exchange that can survive if the RabbitMQ broker restarts; otherwise it is transient.

· queue_declare (false/true) and queue (by default, elasticsearch): These control whether a queue must be bound and the queue name, respectively.

· queue_durable (by default, true): This defines a durable queue that can survive if the RabbitMQ broker restarts; otherwise, it is transient.

· queue_auto_delete (by default, false): This defines a queue where consumers finish (no messages remaining), in which all the messages need to be automatically deleted.

· heartbeat: This controls the heartbeat delay in the connection. It's used to prevent connection dropping if there is network inactivity.

· nack_errors (by default, false): If it is true, there can be failures in bulk action which need to be skipped; otherwise, they are marked as rejected and reprocessed.

Sometimes, the RabbitMQ server is configured in cluster mode for high availability. In this configuration, there is no single host, but a list of multiple hosts. They can be defined in a list of addresses in this way:

{

"rabbitmq" : {

"addresses" : [

{

"host" : "host1",

"port" : 5672

},

{

"host" : "host2",

"port" : 5672

}

]

}

}

There's more…

The RabbitMQ river plugin, along with scripting, allows you to control two important aspects of bulk processing: the global bulk with the bulk_scripting_filter function and every single document with script_filter that must be indexed or created. The definition of these two script filters is accepted as a standard for every filter.

The following are the parameters:

· script: This is the code of the script

· script_lang: This is the language to be used to interpret the code

· script_params: This is a dictionary/map/key-value containing the additional parameter to be passed to the script

The bulk_script_filter function will receive a block of text (body) that is the text of a list of actions. The script must return another block of text to be processed by ElasticSearch. If the script returns null, the bulk is skipped.

The following is an example of the bulk_script_filter declaration:

{

"type" : "rabbitmq",

"rabbitmq" : {

},

"index" : {

...

},

"bulk_script_filter" : {

"script" : "myscript",

"script_lang" : "native",

"script_params" : {

"param1" : "val1",

"param2" : "val2"

...

}

}

}

If a script_filter function is defined, a ctx context is passed to the script for every document, which must be indexed or created.

The following is an example of the script_filter declaration:

{

"type" : "rabbitmq",

"rabbitmq" : {

...

},

"index" : {

...

},

"script_filter" : {

"script" : "ctx.type1.field1 += param1",

"script_lang" : "groovy",

"script_params" : {

"param1" : 1

}

}

}

The RabbitMQ broker is a very powerful tool that supports high load and balancing, moving the peak load on the RabbitMQ message queue. The performance of a message queue in RabbitMQ is by far faster than that of ElasticSearch in processing insert because a message queue system doesn't require you to index the data. So, it can be a good frontend to resolve ElasticSearch index peaks and also to allow the execution of delayed bulk if an ElasticSearch node is down.

See also

· The RabbitMQ river documentation at https://github.com/elasticsearch/elasticsearch-river-rabbitmq

· The Managing a river recipe in this chapter

Using the JDBC river

Generally, application data is stored in a DBMS of some kind (Oracle, MySQL, PostgreSql, a Microsoft SQL server, and SQLite, among others). To power up a traditional application with the advanced search capabilities of ElasticSearch and Lucene, all this data must be imported to ElasticSearch. The JDBC river by Jörg Prante allows you to connect to the DBMS, executing queries and indexing the results.

This plugin can work both as a standard river or as a standalone feeder, so the ingestion part can be executed independently of ElasticSearch.

Getting ready

You will need a working ElasticSearch cluster.

How to do it...

In order to use the JDBC river, perform the following steps:

1. First, you need to install the JDBC river plugin, which is available on GitHub (https://github.com/jprante/elasticsearch-river-jdbc). You can install the river plugin using the following code:

2. bin/plugin -url http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.4.4/elasticsearch-river-jdbc-1.3.4.4-plugin.zip -install river-jdbc

3. This is how the result should look:

4. -> Installing river-jdbc...

5. Trying http://xbib.org/repository/org/xbib/elasticsearch/plugin/elasticsearch-river-jdbc/1.3.4.4/elasticsearch-river-jdbc-1.3.4.4-plugin.zip...

6. Downloading … .....DONE

Installed river-jdbc into …/elasticsearch/plugins/river-jdbc

Tip

The JDBC river plugin does not bundle the DBMS drivers, so you need to download them and put them in the plugin directory (typically, $ES_HOME/plugins/river-jdbc).

7. If you need to use PostgreSQL, you need to download the driver from http://jdbc.postgresql.org/download.html. The direct link to the current driver is available at http://jdbc.postgresql.org/download/postgresql-9.2-1003.jdbc4.jar.

8. If you need to use MySQL, you need to download the driver from http://dev.mysql.com. The direct link to the current driver is available at http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.26.zip/from/http://cdn.mysql.com/.

9. Restart your ElasticSearch node to ensure that the river plugin is loaded. In the log, you should see the following lines:

10.…

11.[2014-10-18 14:59:10,143][INFO ][node ] [Fight-Man] initializing ...

[2014-10-18 14:59:10,163][INFO ][plugins ] [Fight-Man] loaded [river-twitter, transport-thrift, jdbc-1.3.4.4], sites []

12. You need to create a configuration file (.json) to configure the river. In our case, we define a PostgreSQL database, items, and an items table to fetch the data:

13.{

14. "type" : "jdbc",

15. "jdbc" :{

16. "strategy" : "oneshot",

17. "driver" : "org.postgresql.Driver",

18. "url" : "jdbc:postgresql://localhost:5432/items",

19. "user" : "username",

20. "password" : "password",

21. "sql" : "select * from items",

22. "schedule" : "1h",

23. "scale" : 0,

24. "autocommit" : false,

25. "fetchsize" : 100,

26. "max_rows" : 0,

27. "max_retries" : 3,

28. "max_retries_wait" : "10s",

29. "locale" : "it",

30. "digesting" : true

31. },

32. "index" : {

33. "index" : "jdbc",

34. "type" : "jdbc",

35. "max_bulk_actions" : 1000,

36. "max_concurrrent_bulk_requests" : 10,

37. "versioning" : false,

38. "acknowledge" : false

39. }

}

40. Now, create the river with the current configuration:

41.curl -XPUT 'http://127.0.0.1:9200/_river/jdbcriver/_meta' -d @config.json

42. The following result will be obtained:

43.{"_index":"_river","_type":"jdbcriver","_id":"_meta","_version":1, "created":true}

How it works...

The JDBC river is a very versatile river that has a lot of options and covers a large number of common scenarios related to database issues. Since it works with every database system, it provides JDBC drivers, as it is available without built-in drivers. They must be separately installed, usually in the river directory.

The common flow to use the JDBC river is to provide a connection and an SQL query to fetch SQL records that will be converted to ElasticSearch records.

The river type is jdbc and all the client configurations live on the jdbc object. These are the most common parameters:

· strategy (by default, simple): This is the strategy that is used by the JDBC river; currently, the following strategies can be implemented:

· simple: This fetches data with the SQL query, indexes the data in ElasticSearch, waits for the next poll interval, and then restarts the cycle.

· column: This fetches all the records of a table without using SQL.

· driver: This is the JDBC driver class. Every JDBC driver defines its own class:

· MySQL: com.mysql.jdbc.Driver

· PostgreSQL: org.postgresql.Driver

· Oracle: oracle.jdbc.OracleDriver

· SQL server: com.microsoft.sqlserver.jdbc.SQLServerDriver

· url: This defines the JDBC URL for the driver.

· user: This defines the database user's name.

· password: This defines the database user's password.

· sql: This is the SQL statement (either a string or a list of statement objects). Typically, this is a select statement. If it ends with .sql, the statement is looked up in the ElasticSearch server's filesystem. A statement object is usually composed of the following:

· statement: This is usually the SQL select statement that queries the records

· parameter (an optional list of strings): This binds parameters to the SQL statement (in order)

· Callable (by default, false): If this is true, the SQL statement is interpreted as a JDBC CallableStatement object

· locale (optional; by default, it is set to the server locale value): This is the default locale that is used to parse numeric and monetary values.

· timezone: This is the timezone for the JDBC setTimestamp() calls when binding parameters with timestamp values.

· rounding: This determinates the rounding mode for numeric values such as ceiling, down, floor, halfdown, halfeven, halfup, unnecessary, and up.

· scale: This gives precision in the numeric values.

· ignore_null_values (by default, false): If this is enabled, it ignores the NULL values when constructing the JSON document.

· autocommit (by default, false): This is true if each statement is automatically executed. If it is false, they are committed in block.

· fetchsize: This is the fetch size for large result sets; most drivers implement fetchsize to control the amount of rows in the buffer while iterating through the result set.

· max_rows: This limits the number of row fetches by a statement; the rest of the rows are ignored.

· max_retries: This defines the number of retries to connect/reconnect to a database. This is often used when there are problems with the DBMS to automatically reconnect if the connection is dropped.

· max_retries_wait (by default, "30s"): This is the specified wait time between retries.

· schedule: This is either a single one or a list of Cron expressions used for a scheduled execution. Visit the JDBC river plugin home page (https://github.com/jprante/elasticsearch-river-jdbc) for more information.

· cronpoolsize (by default, 4): This is the thread pool size of the Cron job executions for a scheduled parameter. If this is set to 1, jobs will be executed serially.

The JDBC river also provides a good tuning on indexing, letting the user set several index parameters in the index object, which are given as follows:

· index: This defines the ElasticSearch index used to index the data from JDBC.

· type: This defines the ElasticSearch type of the index used to index the data from JDBC.

· max_bulk_actions: This defines the length of each bulk index request submitted.

· max_concurrent_bulk_requests: This defines the maximum number of concurrent bulk requests. This setting controls the rate of bulk operations to prevent a DBMS or ElasticSearch overhead for very high fetches and index cycles.

· index_settings: This defines the optional settings for the ElasticSearch index.

· type_mapping: This defines an optional mapping for the ElasticSearch index type.

The JDBC river plugin has a lot of options, whereby selecting the correct one depends on a particular scenario.

It's a very handy tool to import data from traditional relational databases without too much effort. If complex data manipulation on databases is required, it's better to implement custom river plugins to do the job.

There's more...

The JDBC river can be used as a standalone feeder for ElasticSearch. The feeder interface and the river interface share the same code and functionalities.

The river approach is a pull approach (it grabs the data from a location and puts it in ElasticSearch), while the feeder is of the push approach type (it sends the data to the ElasticSearch cluster, for example, logstash). In the bin/jdbc directory of the river, there are samples of JDBC feeder invocations.

It shares the same JDBC section of the river and also includes some other parameters that control the ElasticSearch client.

· elasticsearch: This is the ElasticSearch server to connect to. Generally, it's defined with the address (ip, port) and cluster name

· client (by default, bulk; the available values are bulk and node): This is the type of client that can be used

· concurrency (by default, 1): This is the number of concurrent pipelines to be executed

The following is an example of a feeder invocation script for a bash shell:

#!/bin/sh

java="/usr/bin/java"

echo '

{

"concurrency" : 1,

"elasticsearch" : "es://localhost:9300?es.cluster.name=elasticsearch",

"client" : "bulk",

"jdbc" : {

…truncated…

}

}

' | ${java} \

-cp $(pwd):$(pwd)/\*:$(pwd)/../../lib/\* \

org.xbib.elasticsearch.plugin.feeder.Runner \

org.xbib.elasticsearch.plugin.feeder.jdbc.JDBCFeeder

The jdbc section is the same between the river and the feeder; it's mandatory to define the ElasticSearch server that must be used to index documents.

The main feeder entry point is the org.xbib.elasticsearch.plugin.feeder.Runner runner function that requires a feeder to instantiate (org.xbib.elasticsearch.plugin.feeder.jdbc.JDBCFeeder), and the configuration is read by the standard input.

See also

· The JDBC river plugin's home page and documentation at https://github.com/jprante/elasticsearch-river-jdbc

· The Managing a river recipe in this chapter

Using the Twitter river

In the previous recipes, you saw rivers that fetch data from data stores such as SQL and NoSQL. In this recipe, we'll discuss how to use the Twitter river in order to collect tweets from Twitter and store them in ElasticSearch.

Getting ready

You will need a working ElasticSearch cluster and an OAuth Twitter token. To obtain the Twitter token, you need to log in to the Twitter developer account at https://dev.twitter.com/apps/ and create a new app, https://dev.twitter.com/apps/new.

How to do it...

In order to use the Twitter river, perform the following steps:

1. First, you need to install the Twitter river plugin, which is available on GitHub (https://github.com/elasticsearch/elasticsearch-river-twitter). You can install the river plugin using the following command:

2. bin/plugin -install elasticsearch/elasticsearch-river-twitter/2.4.0

3. The following result will be obtained:

4. -> Installing elasticsearch/elasticsearch-river-twitter/2.4.0...

5. Trying http://download.elasticsearch.org/elasticsearch/elasticsearch-river-twitter/elasticsearch-river-twitter-2.4.0.zip...

6. Downloading …....DONE

Installed river-twitter into …/elasticsearch/plugins/river-twitter

7. Restart your ElasticSearch node to ensure that the river plugin is loaded. In the log, you should see the following lines:

8. …

9. [2014-10-18 14:59:10,143][INFO ][node ] [Fight-Man] initializing ...

[2014-10-18 14:59:10,163][INFO ][plugins ] [Fight-Man] loaded [river-twitter, transport-thrift, jdbc-river], sites []

10. You need to create a configuration file (.json) that can be used to configure the river, as follows:

11.{

12. "type" : "twitter",

13. "twitter" : {

14. "oauth" : {

15. "consumer_key" : "*** YOUR Consumer key HERE ***",

16. "consumer_secret" : "*** YOUR Consumer secret HERE ***",

17. "access_token" : "*** YOUR Access token HERE ***",

18. "access_token_secret" : "*** YOUR Access token secret HERE ***"

19. },

20. "type" : "sample",

21. "ignore_retweet" : true

22. },

23. "index" : {

24. "index" : "my_twitter_river",

25. "type" : "status",

26. "bulk_size" : 100

27. }

}

28. Now, create the river with the current configuration:

29.curl -XPUT 'http://127.0.0.1:9200/_river/twitterriver/_meta' -d @config.json

30. This is how the result will look:

31.{"_index":"_river","_type":"twitterriver",

"_id":"_meta","_version":1, "created":true}

How it works...

The Twitter river, after authenticating with the Twitter API, starts collecting tweets and sending them in bulk to ElasticSearch.

The river type is twitter and all client configurations live on the Twitter object. These are the most common parameters:

· oauth: This is an object that contains the four keys to access the Twitter API. These are generated when you create a Twitter application:

· consumer_key

· consumer_secret

· access_token

· access_token_secret

· type: This will be of one of the four types allowed by the Twitter API:

· sample: This type takes samples from public tweets

· user: This type listens to tweets in the authenticated user's timeline

· filter: This type allows you to filter tweets based on a criteria (check out https://dev.twitter.com/docs/api/1.1/post/statuses/filter)

· firehose: This type grabs all the public tweets

· raw (by default, false): If this is true, the tweets are indexed in ElasticSearch without any change. Otherwise, they are processed and cleaned by the ElasticSearch river. Take a look at https://github.com/elasticsearch/elasticsearch-river-twitter/blob/master/src/main/java/org/elasticsearch/river/twitter/Twitterriver.java (around line number 560) for more details.

· ignore_retweet (by default, false): If this is true, retweets are skipped.

There's more…

To control the Twitter flow, it's possible to define an additional filter object.

Defining a filter automatically switches the type to filter. The Twitter filter API allows you to define additional parameters to filter, as follows:

· tracks: This is a list of the keywords to be tracked

· follow: These are the IDs of Twitter users to be followed

· locations: These are a set of bounding boxes in the GeoJSON format (longitude, latitude) to track geographic sections

· language: This is a list of language codes to filter on

These are all the filter capabilities allowed by Twitter, in order to reduce the number of tweets sent to you and to focus the search to a particular segment.

This is how a filter river configuration will look:

{

"type" : "twitter",

"twitter" : {

"oauth" : {

"consumer_key" : "*** YOUR Consumer key HERE ***",

"consumer_secret" : "*** YOUR Consumer secret HERE ***",

"access_token" : "*** YOUR Access token HERE ***",

"access_token_secret" : "*** YOUR Access token secret HERE ***"

},

"filter" : {

"tracks" : ["elasticsearch", "cookbook", "packtpub"],

}

}

}

See also

· The Twitter river plugin's home page and documentation at https://github.com/elasticsearch/elasticsearch-river-twitter

· The Managing a river recipe in this chapter