Elasticsearch: The Definitive Guide (2015)
Part VI. Modeling Your Data
Elasticsearch is a different kind of beast, especially if you come from the world of SQL. It comes with many benefits: performance, scale, near real-time search, and analytics across massive amounts of data. And it is easy to get going! Just download and start using it.
But it is not magic. To get the most out of Elasticsearch, you need to understand how it works and how to make it work for your needs.
Handling relationships between entities is not as obvious as it is with a dedicated relational store. The golden rule of a relational database—normalize your data—does not apply to Elasticsearch. In Chapter 40, Chapter 41, and Chapter 42 we discuss the pros and cons of the available approaches.
Then in Chapter 43 we talk about the features that Elasticsearch offers that enable you to scale out quickly and flexibly. Scale is not one-size-fits-all. You need to think about how data flows through your system, and design your model accordingly. Time-based data like log events or social network streams require a very different approach than more static collections of documents.
And finally, we talk about the one thing in Elasticsearch that doesn’t scale.
Chapter 40. Handling Relationships
In the real world, relationships matter: blog posts have comments, bank accounts have transactions, customers have bank accounts, orders have order lines, and directories have files and subdirectories.
Relational databases are specifically designed—and this will not come as a surprise to you—to manage relationships:
§ Each entity (or row, in the relational world) can be uniquely identified by a primary key.
§ Entities are normalized. The data for a unique entity is stored only once, and related entities store just its primary key. Changing the data of an entity has to happen in only one place.
§ Entities can be joined at query time, allowing for cross-entity search.
§ Changes to a single entity are atomic, consistent, isolated, and durable. (See ACID Transactions for more on this subject.)
§ Most relational databases support ACID transactions across multiple entities.
But relational databases do have their limitations, besides their poor support for full-text search. Joining entities at query time is expensive—more joins that are required, the more expensive the query. Performing joins between entities that live on different hardware is so expensive that it is just not practical. This places a limit on the amount of data that can be stored on a single server.
Elasticsearch, like most NoSQL databases, treats the world as though it were flat. An index is a flat collection of independent documents. A single document should contain all of the information that is required to decide whether it matches a search request.
While changing the data of a single document in Elasticsearch is ACIDic, transactions involving multiple documents are not. There is no way to roll back the index to its previous state if part of a transaction fails.
This FlatWorld has its advantages:
§ Indexing is fast and lock-free.
§ Searching is fast and lock-free.
§ Massive amounts of data can be spread across multiple nodes, because each document is independent of the others.
But relationships matter. Somehow, we need to bridge the gap between FlatWorld and the real world. Four common techniques are used to manage relational data in Elasticsearch:
§ Application-side joins
§ Data denormalization
§ Nested objects
§ Parent/child relationships
Often the final solution will require a mixture of a few of these techniques.
Application-side Joins
We can (partly) emulate a relational database by implementing joins in our application. For instance, let’s say we are indexing users and their blog posts. In the relational world, we would do something like this:
PUT /my_index/user/1
{
"name": "John Smith",
"email": "john@smith.com",
"dob": "1970/10/24"
}
PUT /my_index/blogpost/2
{
"title": "Relationships",
"body": "It's complicated...",
"user": 1
}
The index, type, and id of each document together function as a primary key.
The blogpost links to the user by storing the user’s id. The index and type aren’t required as they are hardcoded in our application.
Finding blog posts by user with ID 1 is easy:
GET /my_index/blogpost/_search
{
"query": {
"filtered": {
"filter": {
"term": { "user": 1 }
}
}
}
}
To find blogposts by a user called John, we would need to run two queries: the first would look up all users called John in order to find their IDs, and the second would pass those IDs in a query similar to the preceding one:
GET /my_index/user/_search
{
"query": {
"match": {
"name": "John"
}
}
}
GET /my_index/blogpost/_search
{
"query": {
"filtered": {
"filter": {
"terms": { "user": [1] }
}
}
}
}
The values in the terms filter would be populated with the results from the first query.
The main advantage of application-side joins is that the data is normalized. Changing the user’s name has to happen in only one place: the user document. The disadvantage is that you have to run extra queries in order to join documents at search time.
In this example, there was only one user who matched our first query, but in the real world we could easily have millions of users named John. Including all of their IDs in the second query would make for a very large query, and one that has to do millions of term lookups.
This approach is suitable when the first entity (the user in this example) has a small number of documents and, preferably, they seldom change. This would allow the application to cache the results and avoid running the first query often.
Denormalizing Your Data
The way to get the best search performance out of Elasticsearch is to use it as it is intended, by denormalizing your data at index time. Having redundant copies of data in each document that requires access to it removes the need for joins.
If we want to be able to find a blog post by the name of the user who wrote it, include the user’s name in the blog-post document itself:
PUT /my_index/user/1
{
"name": "John Smith",
"email": "john@smith.com",
"dob": "1970/10/24"
}
PUT /my_index/blogpost/2
{
"title": "Relationships",
"body": "It's complicated...",
"user": {
"id": 1,
"name": "John Smith"
}
}
Part of the user’s data has been denormalized into the blogpost document.
Now, we can find blog posts about relationships by users called John with a single query:
GET /my_index/blogpost/_search
{
"query": {
"bool": {
"must": [
{ "match": { "title": "relationships" }},
{ "match": { "user.name": "John" }}
]
}
}
}
The advantage of data denormalization is speed. Because each document contains all of the information that is required to determine whether it matches the query, there is no need for expensive joins.
Field Collapsing
A common requirement is the need to present search results grouped by a particular field. We might want to return the most relevant blog posts grouped by the user’s name. Grouping by name implies the need for a terms aggregation. To be able to group on the user’s whole name, the name field should be available in its original not_analyzed form, as explained in “Aggregations and Analysis”:
PUT /my_index/_mapping/blogpost
{
"properties": {
"user": {
"properties": {
"name": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
}
}
The user.name field will be used for full-text search.
The user.name.raw field will be used for grouping with the terms aggregation.
Then add some data:
PUT /my_index/user/1
{
"name": "John Smith",
"email": "john@smith.com",
"dob": "1970/10/24"
}
PUT /my_index/blogpost/2
{
"title": "Relationships",
"body": "It's complicated...",
"user": {
"id": 1,
"name": "John Smith"
}
}
PUT /my_index/user/3
{
"name": "Alice John",
"email": "alice@john.com",
"dob": "1979/01/04"
}
PUT /my_index/blogpost/4
{
"title": "Relationships are cool",
"body": "It's not complicated at all...",
"user": {
"id": 3,
"name": "Alice John"
}
}
Now we can run a query looking for blog posts about relationships, by users called John, and group the results by user, thanks to the top_hits aggregation:
GET /my_index/blogpost/_search?search_type=count
{
"query": {
"bool": {
"must": [
{ "match": { "title": "relationships" }},
{ "match": { "user.name": "John" }}
]
}
},
"aggs": {
"users": {
"terms": {
"field": "user.name.raw",
"order": { "top_score": "desc" }
},
"aggs": {
"top_score": { "max": { "script": "_score" }},
"blogposts": { "top_hits": { "_source": "title", "size": 5 }}
}
}
}
}
The blog posts that we are interested in are returned under the blogposts aggregation, so we can disable the usual search hits by setting the search_type=count.
The query returns blog posts about relationships by users named John.
The terms aggregation creates a bucket for each user.name.raw value.
The top_score aggregation orders the terms in the users aggregation by the top-scoring document in each bucket.
The top_hits aggregation returns just the title field of the five most relevant blog posts for each user.
The abbreviated response is shown here:
...
"hits": {
"total": 2,
"max_score": 0,
"hits": []
},
"aggregations": {
"users": {
"buckets": [
{
"key": "John Smith",
"doc_count": 1,
"blogposts": {
"hits": {
"total": 1,
"max_score": 0.35258877,
"hits": [
{
"_index": "my_index",
"_type": "blogpost",
"_id": "2",
"_score": 0.35258877,
"_source": {
"title": "Relationships"
}
}
]
}
},
"top_score": {
"value": 0.3525887727737427
}
},
...
The hits array is empty because we set search_type=count.
There is a bucket for each user who appeared in the top results.
Under each user bucket there is a blogposts.hits array containing the top results for that user.
The user buckets are sorted by the user’s most relevant blog post.
Using the top_hits aggregation is the equivalent of running a query to return the names of the users with the most relevant blog posts, and then running the same query for each user, to get their best blog posts. But it is much more efficient.
The top hits returned in each bucket are the result of running a light mini-query based on the original main query. The mini-query supports the usual features that you would expect from search such as highlighting and pagination.
Denormalization and Concurrency
Of course, data denormalization has downsides too. The first disadvantage is that the index will be bigger because the _source document for every blog post is bigger, and there are more indexed fields. This usually isn’t a huge problem. The data written to disk is highly compressed, and disk space is cheap. Elasticsearch can happily cope with the extra data.
The more important issue is that, if the user were to change his name, all of his blog posts would need to be updated too. Fortunately, users don’t often change names. Even if they did, it is unlikely that a user would have written more than a few thousand blog posts, so updating blog posts with the scroll and bulk APIs would take less than a second.
However, let’s consider a more complex scenario in which changes are common, far reaching, and, most important, concurrent.
In this example, we are going to emulate a filesystem with directory trees in Elasticsearch, much like a filesystem on Linux: the root of the directory is /, and each directory can contain files and subdirectories.
We want to be able to search for files that live in a particular directory, the equivalent of this:
grep "some text" /clinton/projects/elasticsearch/*
This requires us to index the path of the directory where the file lives:
PUT /fs/file/1
{
"name": "README.txt",
"path": "/clinton/projects/elasticsearch",
"contents": "Starting a new Elasticsearch project is easy..."
}
The filename
The full path to the directory holding the file
NOTE
Really, we should also index directory documents so we can list all files and subdirectories within a directory, but for brevity’s sake, we will ignore that requirement.
We also want to be able to search for files that live anywhere in the directory tree below a particular directory, the equivalent of this:
grep -r "some text" /clinton
To support this, we need to index the path hierarchy:
§ /clinton
§ /clinton/projects
§ /clinton/projects/elasticsearch
This hierarchy can be generated automatically from the path field using the path_hierarchy tokenizer:
PUT /fs
{
"settings": {
"analysis": {
"analyzer": {
"paths": {
"tokenizer": "path_hierarchy"
}
}
}
}
}
The custom paths analyzer uses the path_hierarchy tokenizer with its default settings. See path_hierarchy tokenizer.
The mapping for the file type would look like this:
PUT /fs/_mapping/file
{
"properties": {
"name": {
"type": "string",
"index": "not_analyzed"
},
"path": {
"type": "string",
"index": "not_analyzed",
"fields": {
"tree": {
"type": "string",
"analyzer": "paths"
}
}
}
}
}
The name field will contain the exact name.
The path field will contain the exact directory name, while the path.tree field will contain the path hierarchy.
Once the index is set up and the files have been indexed, we can perform a search for files containing elasticsearch in just the /clinton/projects/elasticsearch directory like this:
GET /fs/file/_search
{
"query": {
"filtered": {
"query": {
"match": {
"contents": "elasticsearch"
}
},
"filter": {
"term": {
"path": "/clinton/projects/elasticsearch"
}
}
}
}
}
Find files in this directory only.
Every file that lives in any subdirectory under /clinton will include the term /clinton in the path.tree field. So we can search for all files in any subdirectory of /clinton as follows:
GET /fs/file/_search
{
"query": {
"filtered": {
"query": {
"match": {
"contents": "elasticsearch"
}
},
"filter": {
"term": {
"path.tree": "/clinton"
}
}
}
}
}
Find files in this directory or in any of its subdirectories.
Renaming Files and Directories
So far, so good. Renaming a file is easy—a simple update or index request is all that is required. You can even use optimistic concurrency control to ensure that your change doesn’t conflict with the changes from another user:
PUT /fs/file/1?version=2
{
"name": "README.asciidoc",
"path": "/clinton/projects/elasticsearch",
"contents": "Starting a new Elasticsearch project is easy..."
}
The version number ensures that the change is applied only if the document in the index has this same version number.
We can even rename a directory, but this means updating all of the files that exist anywhere in the path hierarchy beneath that directory. This may be quick or slow, depending on how many files need to be updated. All we would need to do is to use scan-and-scroll to retrieve all the files, and the bulk API to update them. The process isn’t atomic, but all files will quickly move to their new home.
Solving Concurrency Issues
The problem comes when we want to allow more than one person to rename files or directories at the same time. Imagine that you rename the /clinton directory, which contains hundreds of thousands of files. Meanwhile, another user renames the single file/clinton/projects/elasticsearch/README.txt. That user’s change, although it started after yours, will probably finish more quickly.
One of two things will happen:
§ You have decided to use version numbers, in which case your mass rename will fail with a version conflict when it hits the renamed README.asciidoc file.
§ You didn’t use versioning, and your changes will overwrite the changes from the other user.
The problem is that Elasticsearch does not support ACID transactions. Changes to individual documents are ACIDic, but not changes involving multiple documents.
If your main data store is a relational database, and Elasticsearch is simply being used as a search engine or as a way to improve performance, make your changes in the database first and replicate those changes to Elasticsearch after they have succeeded. This way, you benefit from the ACID transactions available in the database, and all changes to Elasticsearch happen in the right order. Concurrency is dealt with in the relational database.
If you are not using a relational store, these concurrency issues need to be dealt with at the Elasticsearch level. The following are three practical solutions using Elasticsearch, all of which involve some form of locking:
§ Global Locking
§ Document Locking
§ Tree Locking
TIP
The solutions described in this section could also be implemented by applying the same principles while using an external system instead of Elasticsearch.
Global Locking
We can avoid concurrency issues completely by allowing only one process to make changes at any time. Most changes will involve only a few files and will complete very quickly. A rename of a top-level directory may block all other changes for longer, but these are likely to be much less frequent.
Because document-level changes in Elasticsearch are ACIDic, we can use the existence or absence of a document as a global lock. To request a lock, we try to create the global-lock document:
PUT /fs/lock/global/_create
{}
If this create request fails with a conflict exception, another process has already been granted the global lock and we will have to try again later. If it succeeds, we are now the proud owners of the global lock and we can continue with our changes. Once we are finished, we must release the lock by deleting the global lock document:
DELETE /fs/lock/global
Depending on how frequent changes are, and how long they take, a global lock could restrict the performance of a system significantly. We can increase parallelism by making our locking more fine-grained.
Document Locking
Instead of locking the whole filesystem, we could lock individual documents by using the same technique as previously described. A process could use a scan-and-scroll request to retrieve the IDs of all documents that would be affected by the change, and would need to create a lock file for each of them:
PUT /fs/lock/_bulk
{ "create": { "_id": 1}}
{ "process_id": 123 }
{ "create": { "_id": 2}}
{ "process_id": 123 }
...
The ID of the lock document would be the same as the ID of the file that should be locked.
The process_id is a unique ID that represents the process that wants to perform the changes.
If some files are already locked, parts of the bulk request will fail and we will have to try again.
Of course, if we try to lock all of the files again, the create statements that we used previously will fail for any file that is already locked by us! Instead of a simple create statement, we need an update request with an upsert parameter and this script:
if ( ctx._source.process_id != process_id ) {
assert false;
}
ctx.op = 'noop';
process_id is a parameter that we pass into the script.
assert false will throw an exception, causing the update to fail.
Changing the op from update to noop prevents the update request from making any changes, but still returns success.
The full update request looks like this:
POST /fs/lock/1/_update
{
"upsert": { "process_id": 123 },
"script": "if ( ctx._source.process_id != process_id )
{ assert false }; ctx.op = 'noop';"
"params": {
"process_id": 123
}
}
If the document doesn’t already exist, the upsert document will be inserted—much the same as the create request we used previously. However, if the document does exist, the script will look at the process_id stored in the document. If it is the same as ours, it aborts the update (noop) and returns success. If it is different, the assert false throws an exception and we know that the lock has failed.
Once all locks have been successfully created, the rename operation can begin. Afterward, we must release all of the locks, which we can do with a delete-by-query request:
POST /fs/_refresh
DELETE /fs/lock/_query
{
"query": {
"term": {
"process_id": 123
}
}
}
The refresh call ensures that all lock documents are visible to the delete-by-query request.
Document-level locking enables fine-grained access control, but creating lock files for millions of documents can be expensive. In certain scenarios, such as this example with directory trees, it is possible to achieve fine-grained locking with much less work.
Tree Locking
Rather than locking every involved document, as in the previous option, we could lock just part of the directory tree. We will need exclusive access to the file or directory that we want to rename, which can be achieved with an exclusive lock document:
{ "lock_type": "exclusive" }
And we need shared locks on any parent directories, with a shared lock document:
{
"lock_type": "shared",
"lock_count": 1
}
The lock_count records the number of processes that hold a shared lock.
A process that wants to rename /clinton/projects/elasticsearch/README.txt needs an exclusive lock on that file, and a shared lock on /clinton, /clinton/projects, and /clinton/projects/elasticsearch.
A simple create request will suffice for the exclusive lock, but the shared lock needs a scripted update to implement some extra logic:
if (ctx._source.lock_type == 'exclusive') {
assert false;
}
ctx._source.lock_count++
If the lock_type is exclusive, the assert statement will throw an exception, causing the update request to fail.
Otherwise, we increment the lock_count.
This script handles the case where the lock document already exists, but we will also need an upsert document to handle the case where it doesn’t exist yet. The full update request is as follows:
POST /fs/lock/%2Fclinton/_update
{
"upsert": {
"lock_type": "shared",
"lock_count": 1
},
"script": "if (ctx._source.lock_type == 'exclusive')
{ assert false }; ctx._source.lock_count++"
}
The ID of the document is /clinton, which is URL-encoded to %2fclinton.
The upsert document will be inserted if the document does not already exist.
Once we succeed in gaining a shared lock on all of the parent directories, we try to create an exclusive lock on the file itself:
PUT /fs/lock/%2Fclinton%2fprojects%2felasticsearch%2fREADME.txt/_create
{ "lock_type": "exclusive" }
Now, if somebody else wants to rename the /clinton directory, they would have to gain an exclusive lock on that path:
PUT /fs/lock/%2Fclinton/_create
{ "lock_type": "exclusive" }
This request would fail because a lock document with the same ID already exists. The other user would have to wait until our operation is done and we have released our locks. The exclusive lock can just be deleted:
DELETE /fs/lock/%2Fclinton%2fprojects%2felasticsearch%2fREADME.txt
The shared locks need another script that decrements the lock_count and, if the count drops to zero, deletes the lock document:
if (--ctx._source.lock_count == 0) {
ctx.op = 'delete'
}
Once the lock_count reaches 0, the ctx.op is changed from update to delete.
This update request would need to be run for each parent directory in reverse order, from longest to shortest:
POST /fs/lock/%2Fclinton%2fprojects%2felasticsearch/_update
{
"script": "if (--ctx._source.lock_count == 0) { ctx.op = 'delete' } "
}
Tree locking gives us fine-grained concurrency control with the minimum of effort. Of course, it is not applicable to every situation—the data model must have some sort of access path like the directory tree for it to work.
NOTE
None of the three options—global, document, or tree locking—deals with the thorniest problem associated with locking: what happens if the process holding the lock dies?
The unexpected death of a process leaves us with two problems:
§ How do we know that we can release the locks held by the dead process?
§ How do we clean up the change that the dead process did not manage to complete?
These topics are beyond the scope of this book, but you will need to give them some thought if you decide to use locking.
While denormalization is a good choice for many projects, the need for locking schemes can make for complicated implementations. Instead, Elasticsearch provides two models that help us deal with related entities: nested objects and parent-child relationships.