ZooKeeper (2013)
Part II. Programming with ZooKeeper
Chapter 8. Curator: A High-Level API for ZooKeeper
At a high level, Curator is a set of libraries that build on top of ZooKeeper. One of the core goals of Curator is to manage the ZooKeeper handle for you, removing some (ideally all) of the complexity that connection management entails. Connection management is often tricky, as we have discussed in the past chapters, and Curator might come in handy at times.
As part of managing the handle, Curator implements a set of recipes that developers commonly use, incorporating best practices and known edge-case handling. For example, Curator implements recipes for primitives such as locks, barriers, and caches. For ZooKeeper operations likecreate, delete, getData, etc., it streamlines programming by allowing us to chain calls, a programming style often called fluent. It also provides namespaces, automatic reconnection, and other facilities that make applications more robust.
The Curator components were originally implemented and contributed by Netflix, and it has recently been promoted to a top-level project of the Apache Software Foundation.
In this chapter, we cover the implementation of the master in our example using Curator features. Our goal is not to provide a detailed and extensive discussion of Curator, but simply to introduce it and highlight some of the features that are convenient to use with a ZooKeeper application. Check the project page for an extensive list of its features.
The Curator Client
Just as with ZooKeeper, before doing anything with Curator, we need to create a client. The client is typically an instance of CuratorFramework that we obtain by calling the Curator framework factory:
CuratorFramework zkc =
CuratorFrameworkFactory.newClient(connectString, retryPolicy);
The connectString input parameter is the list of ZooKeeper servers we can connect to, just like when creating a ZooKeeper client. The retryPolicy parameter is a new feature of Curator. It enables the developer to specify a policy for retrying operations in the event of disconnections. Recall that with the regular ZooKeeper interface, we typically resubmit operations upon a connection loss event.
NOTE
Our example instantiates the CuratorFramework client. There are other methods in the factory class to create an instance, but we don’t cover them here. One is the CuratorZooKeeperClient class, which provides some additional functionality on top of the ZooKeeper client, such as enabling operations that are safe in the face of unanticipated disconnections. Unlike theCuratorFramework class, operations on a CuratorZooKeeperClient are executed directly against the ZooKeeper client handle.
Fluent API
A fluent API enables us to write code by chaining calls instead of relying upon a rigid signature scheme for invoking an operation. For example, with the standard ZooKeeper API, we create a znode synchronously by invoking something like:
zk.create("/mypath",
new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
With the fluent API of Curator, we make the same call this way:
zkc.create().withMode(CreateMode.PERSISTENT).forPath("/mypath", new byte[0]);
The create call returns a CreateBuilder instance and the subsequent calls return an object of a type that CreateBuilder extends. For example, CreateBuilder extends CreateModable<ACLBackgroundPathAndBytesable<String>>, and withMode is declared in the genericCreateModable<T> interface. Builders are available for the other operations as well—delete, setData, getData, checkExists, and getChildren—through the Curator framework client object.
To execute the same operation asynchronously, we add inBackground as follows:
zkc.create().inBackground().withMode(CreateMode.PERSISTENT).forPath("/mypath",
new byte[0]);
This returns immediately, and we have to create one or more listeners to receive the callback that is returned when the znode is created. We discuss listeners and how to register them in the next section.
There are a few different ways to implement the callback for an asynchronous call. If we issue the previous string of calls, the callback is delivered in the form of a CREATE event to registered listeners. The inBackground call optionally takes a context object, a concrete callback implementation to invoke, and even an executor (java.util.concurrent.Executor) to execute the callback. In Java, an executor is an object that executes runnable objects; we can use it here to decouple the execution of the callback from the callback thread of the ZooKeeper client. Using an executor is usually better than creating one new thread for each task.
To set a watch, we simply add watched to the call chain. For example:
zkc.getData().inBackground().watched().forPath("/mypath");
The notification triggered by the watcher is processed through listeners as well, and they are passed as a WATCHED event to a given listener. It is also possible to replace watched with a call to usingWatcher, which takes a regular ZooKeeper Watcher object and calls it when it receives the notification. A third option is to pass a CuratorWatcher object. The process method of CuratorWatcher, unlike from a ZooKeeper Watcher, might throw an exception.
Listeners
Listeners process events that the Curator library generates. To exercise this mechanism, the application implements one or more listeners and registers them with the Curator framework client. Events are delivered to all registered listeners.
The listener mechanism is generic and can be used for all manner of events that happen asynchronously. As we discussed in the previous section, Curator uses listeners to process callbacks and watch notifications. The mechanism also can be used to handle the exceptions generated by background tasks.
Let’s have a look at how to implement a listener that processes all callbacks and watch notifications for our master Curator example. The first step is to implement the template for a CuratorListener:
CuratorListener masterListener = new CuratorListener() {
public void eventReceived(CuratorFramework client, CuratorEvent event) {
try {
switch (event.getType()) {
case CHILDREN:
...
break;
case CREATE:
...
break;
case DELETE:
...
break;
case WATCHED:
...
break;
}
} catch (Exception e) {
LOG.error("Exception while processing event.", e);
try {
close();
} catch (IOException ioe) {
LOG.error("IOException while closing.", ioe);
}
}
};
Because the goal here is just to illustrate the structure that we need to implement, we have omitted the code detail for each of the cases. Check the code examples that come with this book for more detail.
We next need to register the listener. For this we need a framework client, which we can create just like the first client we created:
client = CuratorFrameworkFactory.newClient(hostPort, retryPolicy);
Once we have the framework client, we register the listener as follows:
client.getCuratorListenable().addListener(masterListener);
A special kind of listener deals with errors reported when a background thread catches an exception. This is a low-level detail, but it might be necessary if you want to handle them in your application. When the application needs to deal with such errors, it must implement a different kind of listener:
UnhandledErrorListener errorsListener = new UnhandledErrorListener() {
public void unhandledError(String message, Throwable e) {
LOG.error("Unrecoverable error: " + message, e);
try {
close();
} catch (IOException ioe) {
LOG.warn( "Exception when closing.", ioe );
}
}
};
and register it with the listener client as follows:
client.getUnhandledErrorListenable().addListener(errorsListener);
Note that implementing listeners as event handlers, as we discussed in this section, is somewhat different from the way we proposed to implement ZooKeeper applications in previous chapters. For the master-worker example implemented directly on top of ZooKeeper (see A Common Pattern), we chain calls and callbacks, and each callback is handled by a different callback implementation. The callback implementations even have different types. With Curator, the details of a callback or a watch notification are encapsulated into an Event object, which makes it amenable to an implementation using a single event handler.
State Changes in Curator
Curator exposes a different set of states than ZooKeeper. It has, for example, a SUSPENDED state, and it uses LOST to represent session expiration. The state machine for the connection states is illustrated in Figure 8-1. When dealing with state changes, our recommendation is in general to halt all operations of the master because we do not know if the ZooKeeper client will be able to reconnect before the session expires, and even if it does, the client might not be the primary master any more. It is safer to play conservatively in the case of a disconnection.
Figure 8-1. Curator connection state machine
There is an additional READ_ONLY state, which is not relevant for our example case. A connection goes into read-only mode if the ZooKeeper ensemble has read-only mode enabled and the server the client is connected to goes into read-only mode. As the server transitions to read-only mode, it cannot form a quorum with other servers because it is partitioned away. While the connection is in read-only mode, the client will miss any update that goes through. Such updates are possible if there is a subset of the ensemble that is able to form a quorum and that receives requests from the client to update the ZooKeeper state. A partition can last for arbitrarily long (it is out of the control of ZooKeeper) and consequently the number of updates it might miss is unbounded. Missing updates could lead to incorrect behavior of the application, so we strongly recommend thinking carefully about the consequences before enabling it. Note that the ability of going into read-only mode is not exclusive of Curator; ZooKeeper enables such an option (see Chapter 10).
A Couple of Edge Cases
There are a couple of interesting error scenarios that Curator handles nicely. The first one has to do with the presence of errors during the creation of sequential znodes, and the second one with errors when deleting a znode:
Sequential znodes
If the server the client is connected to crashes before returning the znode name (with the sequence number) or the client simply disconnects, then the client doesn’t get a response even if the operation has been executed. As a consequence, the client doesn’t know the path to the znode it created. Recall that we use sequential znodes, for example, in recipes that establish an order for participating clients. To address this problem, CreateBuilder provides a withProtection call that tells the Curator client to prefix the sequential znode with a unique identifier. If thecreate fails, the client retries the operation, and as part of retrying it verifies whether there is already a znode with the unique identifier.
Guaranteed deletes
A similar situation occurs with delete operations. If the client disconnects from the server while executing a delete operation, it doesn’t know whether the delete operation has succeeded or not. If the presence of the znode being deleted indicates, for example, that a resource is locked, it is important to delete the znode to make sure that the resource is free to be used again. The Curator client provides a call that enables an application to make the execution of a delete operation guaranteed. The operation is guaranteed in the sense that the Curator client reexecutes the operation until it succeeds, and for as long as the Curator client instance is valid. To use this feature, the DeleteBuilder interface defines a guaranteed call.
Recipes
Curator provides a variety of recipes, and we encourage you to have a look at the extensive list of available recipes implemented. Here we discuss three recipes that we have used in the implementation of the Curator master: LeaderLatch, LeaderSelector, and PathChildrenCache.
Leader Latch
We can use the leader latch primitive to elect a master in our application. First, we need a LeaderLatch instance:
leaderLatch = new LeaderLatch(client, "/master", myId);
The constructor of LeaderLatch takes a Curator framework client, a ZooKeeper path for this leadership group, and an identifier for this master. To enable callbacks when this Curator client acquires or loses leadership, we need to register an implementation of the LeaderLatchListenerinterface. This interface has two methods: isLeader and notLeader. This is what our isLeader implementation looks like:
@Override
public void isLeader()
{
...
/*
* Start workersCache
*/
workersCache.getListenable().addListener(workersCacheListener);
workersCache.start();
(new RecoveredAssignments(
client.getZooKeeperClient().getZooKeeper())).recover(
new RecoveryCallback() {
public void recoveryComplete (int rc, List<String> tasks) {
try {
if(rc == RecoveryCallback.FAILED) {
LOG.warn("Recovery of assigned tasks failed.");
} else {
LOG.info( "Assigning recovered tasks" );
recoveryLatch = new CountDownLatch(tasks.size());
assignTasks(tasks);
}
new Thread( new Runnable() {
public void run() {
try {
/*
* Wait until recovery is complete
*/
recoveryLatch.await();
/*
* Start tasks cache
*/
tasksCache.getListenable().
addListener(tasksCacheListener);
tasksCache.start();
} catch (Exception e) {
LOG.warn("Exception while assigning
and getting tasks.",
e );
}
}
}).start();
} catch (Exception e) {
LOG.error("Exception while executing the recovery callback",
e);
}
}
});
}
We start the workers cache before anything else to make sure that we have workers to assign tasks to.
Once we determine that we have tasks to assign that have not been assigned by the previous master, we proceed with assigning them.
We implement a barrier so that we wait until the assignment of recovered tasks ends before we move into assigning new tasks. If we don’t do it, then the new master ends up assigning all recovered tasks again. Also, we do it in a separate thread just so that we don’t lock the ZooKeeper client callback thread.
Once the master finishes with assigning recovered tasks, we start assigning new tasks.
We implement this method as part of the CuratorMasterLatch class, and CuratorMasterLatch implements LeaderLatchListener. We need to register the listener, however, before we actually start. We do both in the runForMaster method, on top of adding two other listeners for watch events and errors, respectively:
public void runForMaster() {
client.getCuratorListenable().addListener(masterListener);
client.getUnhandledErrorListenable().addListener(errorsListener);
leaderLatch.addListener(this);
leaderLatch.start();
}
For the notLeader call, which we execute once the master loses leadership, we simply close everything, which is sufficient for the purposes of this example. For a real application, you may need to clean up some local state and wait to become master again. If the LeaderLatch object is not closed, the Curator client will be considered for leadership again.
Leader Selector
An alternative recipe for electing a master is LeaderSelector. The main difference between LeaderLatch and LeaderSelector is the listener interface they use. LeaderSelector uses LeaderSelectorListener instead, which defines a takeLeadership method and inheritsstateChanged. We can use the leader latch primitive to elect a master in our application. First, we need a LeaderSelector instance:
leaderSelector = new LeaderSelector(client, "/master", this);
The constructor of LeaderSelector takes a Curator framework client, a ZooKeeper path for the leadership group this master is participating in, and an implementation of LeaderSelectorListener. The leadership group is the group of Curator clients participating in the master election. The LeaderSelectorListener implementation must contain both a takeLeadership method and a stateChanged one. The takeLeadership method is executed upon acquiring leadership, and most of its code for our example is the same as the code for isLeader. In our case, we implement it as follows:
CountDownLatch leaderLatch = new CountDownLatch(1);
CountDownLatch closeLatch = new CountDownLatch(1);
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
...
/*
* Start workersCache
*/
workersCache.getListenable().addListener(workersCacheListener);
workersCache.start();
(new RecoveredAssignments(
client.getZooKeeperClient().getZooKeeper())).recover(
new RecoveryCallback() {
public void recoveryComplete (int rc, List<String> tasks) {
try {
if(rc == RecoveryCallback.FAILED) {
LOG.warn("Recovery of assigned tasks failed.");
} else {
LOG.info( "Assigning recovered tasks" );
recoveryLatch = new CountDownLatch(tasks.size());
assignTasks(tasks);
}
new Thread( new Runnable() {
public void run() {
try {
/*
* Wait until recovery is complete
*/
recoveryLatch.await();
/*
* Start tasks cache
*/
tasksCache.getListenable().
addListener(tasksCacheListener);
tasksCache.start();
} catch (Exception e) {
LOG.warn("Exception while assigning
and getting tasks.",
e );
}
}
}).start();
/*
* Decrement latch
*/
leaderLatch.countDown();
} catch (Exception e) {
LOG.error("Exception while executing the recovery callback",
e);
}
}
});
/*
* This latch is to prevent this call from exiting. If we exit, then
* we release mastership.
*/
closeLatch.await();
}
We provide a separate CountDownLatch to wait until this Curator client acquires leadership.
If the master exits the takeLeadership call, it gives up mastership. We use a CountDownLatch to prevent it from exiting until we close the master.
We implement this method as part of the CuratorMaster class, and CuratorMaster implements LeaderSelectorListener. It is important that the master only exits takeLeadership if it wants to release mastership. We need, essentially, some form of lock to prevent it from exiting. In our implementation, we use a latch that we decrement when exiting the master instance.
We also start the leader selector in the runForMaster call, but unlike with LeaderLatch, we do not need to register a listener here (we register the listener in the constructor instead):
public void runForMaster() {
client.getCuratorListenable().addListener(masterListener);
client.getUnhandledErrorListenable().addListener(errorsListener);
leaderSelector.setId(myId);
leaderSelector.start();
}
We additionally give this master an arbitrary identifier. Although we have not done it in this example, we could also set the leader selector to automatically requeue (LeaderSelector.autoRequeue) upon losing leadership. Requeuing means that this client continuously tries to acquire leadership and it executes takeLeadership each time leadership is acquired.
As part of implementing the LeaderSelectorListener interface, we implement a method to handle connection state changes:
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
switch(newState) {
case CONNECTED:
//Nothing to do in this case.
break;
case RECONNECTED:
// Reconnected, so I should
// still be the leader.
break;
case SUSPENDED:
LOG.warn("Session suspended");
break;
case LOST:
try {
close();
} catch (IOException e) {
LOG.warn( "Exception while closing", e );
}
break;
case READ_ONLY:
// We ignore this case.
break;
}
}
All operations of the master are through ZooKeeper. If the connection is lost, no operation of the master will go through. It is safe to do nothing.
If the session is lost, we simply close this master.
Children Cache
The last recipe we make use of in our example is the children cache (class PathChildrenCache). We use it both for the list of workers and for the list of tasks. This cache is responsible mainly for keeping a local copy of the list of children and for notifying us of changes to the cached set. Note that because of timing issues, the set might not be identical to the one ZooKeeper stores at a particular point in time, although it will eventually reflect changes to the ZooKeeper state.
To deal with changes for each instance of the cache, we implement the PathChildrenCacheListener interface, which has a single childEvent method. For the list of workers, we only care about workers going away because we need to reassign their tasks. Additions to the list are important when assigning new tasks:
PathChildrenCacheListener workersCacheListener = new PathChildrenCacheListener()
{
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
{
if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
/*
* Obtain just the worker's name
*/
try {
getAbsentWorkerTasks(event.getData().getPath().replaceFirst(
"/workers/", ""));
} catch (Exception e) {
LOG.error("Exception while trying to re-assign tasks.", e);
}
}
}
};
For the list of tasks, we use additions to the list to trigger the assignment process:
PathChildrenCacheListener tasksCacheListener = new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) {
if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
try {
assignTask(event.getData().getPath().replaceFirst("/tasks/",""));
} catch (Exception e) {
LOG.error("Exception when assigning task.", e);
}
}
}
};
Note that we make an assumption here that there is at least one worker available to assign tasks to. In the case that there is no worker available, we need to hold the assignment by remembering the additions to the list that have not been assigned and assign them upon an addition to the list of workers. We do not implement this feature for the sake of simplicity; we leave it as an exercise for the reader.
Takeaway Messages
Curator implements a set of nice extensions to the ZooKeeper API, abstracting away some of the complexities of ZooKeeper and implementing best practices gleaned from production experience and discussions in the community. In this chapter, we have covered how to leverage some of the features of Curator for the implementation of the master role in our master-worker example. We have particularly used the leader election implementations and the children cache to implement important features of the master. These two recipes are not the only ones Curator implements, however; a number of other recipes and features are available.