ZooKeeper (2013)
Part II. Programming with ZooKeeper
Chapter 7. The C Client
Although the Java interface to ZooKeeper is the predominant one, the C ZooKeeper client binding is also popular among ZooKeeper developers and forms the foundation for bindings in other languages. This chapter focuses on this binding. To illustrate the development of ZooKeeper applications with the C API, we’ll reimplement the master of our master-worker example in C. The general idea is to expose the differences when compared to the Java API through an example.
The main reference for the C API is the zookeeper.h file in the ZooKeeper distribution, and the instructions to build the client library are given in the README file of the project distribution. Alternatively, you can use ant compile-native, which automates it all. Before going into code snippets, we’ll give a quick summary of how to set up the development environment to help you get started.
When we build the C client, it produces two libraries: one for multithreaded clients and the other for single-threaded clients. Most of this chapter assumes the multithreaded library is being used; we discuss the single-threaded version toward the end of the chapter, but we encourage the reader to focus on multithreaded implementations.
Setting Up the Development Environment
In the ZooKeeper distribution, we can ship precompiled JAR files that are ready to run on any platform. To compile natively using C, we need to build the required shared libraries before we can build our native C ZooKeeper applications. Fortunately, ZooKeeper has an easy way to build these libraries.
The easiest option to build the ZooKeeper native libraries is to use the ant build tool. In the directory where you unpacked the ZooKeeper distribution, there is a file called build.xml. This file has the instructions required for ant to build everything. You will also need automake, autoconf, andcppunit. These should be available in your host distribution if you are using Linux. Cygwin supplies the packages on Windows. On Mac OS X, you can use an open-source package manager such as Fink, Brew, or MacPorts.
Once all the needed applications are installed, you can build the ZooKeeper libraries using:
ant compile-native
Once the build finishes you will find libraries that you need to link with in build/c/build/usr/lib and include files you need in build/c/build/usr/include/zookeeper.
Starting a Session
To do anything with ZooKeeper, we first need a zhandle_t handle. To get a handle, we call zookeeper_init, which has the following signature:
ZOOAPI zhandle_t *zookeeper_init(const char *host,
watcher_fn fn,
int recv_timeout,
const clientid_t *clientid,
void *context,
int flags);
String containing the host addresses of the ZooKeeper servers in the ensemble. The addresses are host:port pairs, and the pairs are comma-separated.
Watcher function for processing events, defined next in this section.
Session expiration time in milliseconds.
Client ID of a session that has been previously established and that this client is trying to reconnect to. To obtain the client ID of an established session, we call zoo_client_id. Specify 0 to start a new session.
Context object that is used with the returned zkhandle_t handle.
There is no current use for this parameter, so it should be set to 0.
THE ZOOAPI DEFINITION
ZOOAPI is used to build ZooKeeper on Windows. The possible values for ZOOAPI are __declspec(dllexport), __declspec(dllimport), and empty. The keywords __declspec(dllexport) and \_\_declspec(dllimport) export and import symbols to and from a DLL, respectively. If you are not building on Windows, leave ZOOAPI empty. In principle, nothing needs to be configured if you are building on Windows; the distribution configuration should be sufficient.
The call to zookeeper_init may return before the session establishment actually completes. Consequently, the session shouldn’t be considered established until a ZOO_CONNECTED_STATE event has been received. This event is processed with an implementation of the watcher function, which has the following signature:
typedef void (*watcher_fn)(zhandle_t *zh,
int type,
int state,
const char *path,
void *watcherCtx);
ZooKeeper handle that this call to the watcher function refers to.
Type of event: ZOO_CREATED_EVENT, ZOO_DELETED_EVENT, ZOO_CHANGED_EVENT, ZOO_CHILD_EVENT, or ZOO_SESSION_EVENT.
State of the connection.
Znode path for which the watch is triggered. If the event is a session event, the path is null.
Context object for the watcher.
Here is an example of how to implement a watcher:
static int connected = 0;
static int expired = 0;
void main_watcher (zhandle_t *zkh,
int type,
int state,
const char *path,
void* context)
{
if (type == ZOO_SESSION_EVENT) {
if (state == ZOO_CONNECTED_STATE) {
connected = 1;
} else if (state == ZOO_NOTCONNECTED_STATE ) {
connected = 0;
} else if (state == ZOO_EXPIRED_SESSION_STATE) {
expired = 1;
connected = 0;
zookeeper_close(zkh);
}
}
}
Set connected upon receiving a ZOO_CONNECTED_STATE event.
Set expired (and close the session handle) upon receiving a ZOO_EXPIRED_SESSION_STATE event.
WATCH DATA STRUCTURES
ZooKeeper does not have a way to remove watches so as long as watches are outstanding. Consequently, it’s important to keep watch data structures around even if the process no longer cares about the session, because the completion functions may still get invoked. Java takes care of this automatically through garbage collection.
To put everything together, this is the init function we have for our master:
static int server_id;
int init (char* hostPort) {
srand(time(NULL));
server_id = rand();
zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
zh = zookeeper_init(hostPort,
main_watcher,
15000,
0,
0,
0);
return errno;
}
Sets the server ID.
Sets the log severity level to output.
Call to create a session.
The first two lines set the seed for random number generation and set the identifier of this master. We use the server_id to identify different masters. (Recall that we can have one or more backup masters as well as a primary master.) Next, we set the severity level of the log messages. The implementation of logging is homebrewed (see log.h), and we have copied from the ZooKeeper distribution (zookeeper_log.h) for convenience. Finally, we have the call to zookeeper_init, which makes main_watcher the function that processes session events.
Bootstrapping the Master
Bootstrapping the master refers to creating a few znodes used in the operation of the master-worker example and running for primary master. We first create four necessary znodes:
void bootstrap() {
if(!connected) {
LOG_WARN(("Client not connected to ZooKeeper"));
return;
}
create_parent("/workers", "");
create_parent("/assign", "");
create_parent("/tasks", "");
create_parent("/status", "");
...
}
If not yet connected, log that fact and return.
Create four parent znodes: /workers, /assign, /tasks, and /status.
And here’s the corresponding create_parent function:
void create_parent(const char * path,
const char * value) {
zoo_acreate(zh,
path,
value,
0,
&ZOO_OPEN_ACL_UNSAFE,
0,
create_parent_completion,
NULL);
}
Asynchronous call to create a znode. It passes a zhandle_t instance, which is a global static variable in our implementation.
The path is a parameter of the call of type const char*. The path is used to tie a client to a subtree of a znode, as described in Managing Client Connect Strings.
The second parameter of the call is the data to store with the znode. We pass this data to create_parent just to illustrate that we need to pass it as the completion data of zoo_create in case we need to retry the operation. In our example, passing data to create_parent is not strictly necessary because it is empty in all four cases.
This parameter is the length of the value being stored (the previous parameter). In this case, we set it to zero.
We don’t care about ACLs in this example, so we just set it to be unsafe.
These parent znodes are persistent and not sequential, so we don’t pass any flags.
Because this is an asynchronous call, we pass a completion function that the ZooKeeper client calls upon completion of the operation.
The last parameter is the context of this call, but in this particular case, there is no context to be passed.
Because this is an asynchronous call, we pass a completion function to be called when the operation completes. The definition of the completion function is:
typedef void
(*string_completion_t)(int rc,
const char *value,
const void *data);
rc is the return code, which appears in all completion functions.
value is the string returned.
data is context data passed by the caller when making an asynchronous call. Note that the programmer is responsible for freeing any heap space associated with the data pointer.
For this particular example, we have this implementation:
void create_parent_completion (int rc, const char *value, const void *data) {
switch (rc) {
case ZCONNECTIONLOSS:
create_parent(value, (const char *) data);
break;
case ZOK:
LOG_INFO(("Created parent node", value));
break;
case ZNODEEXISTS:
LOG_WARN(("Node already exists"));
break;
default:
LOG_ERROR(("Something went wrong when running for master"));
break;
}
}
Check the return code to determine what to do.
Try again in the case of connection loss.
Most of the completion function consists simply of logging to inform us of what is going on. In general, completion functions are a bit more complex, although it is good practice to split functionality across different completion methods as we do in this example. Note that if a connection is lost, this code ends up calling create_parent multiple times. This is not a recursive call because the completion function is not called by create_parent. Also, create_parent simply calls a ZooKeeper function, so it has no side effects that would come, for example, from allocating memory space. If we do create side effects, it is important to clean up before making another call from the completion function.
The next task is to run for master. Running for master basically involves trying to create the /master znode to lock in the primary master role. There are a few differences from the asynchronous create call we just discussed for parent znodes, though:
void run_for_master() {
if(!connected) {
LOG_WARN(LOGCALLBACK(zh),
"Client not connected to ZooKeeper");
return;
}
char server_id_string[9];
snprintf(server_id_string, 9, "%x", server_id);
zoo_acreate(zh,
"/master",
(const char *) server_id_string,
sizeof(int),
&ZOO_OPEN_ACL_UNSAFE,
ZOO_EPHEMERAL,
master_create_completion,
NULL);
}
Store the server identifier in the /master znode.
We have to pass the length of the data being stored. It is an int, as we have declared here.
This znode is ephemeral, so we have to pass the ephemeral flag.
The completion function also has to do a bit more than the earlier one:
void master_create_completion (int rc, const char *value, const void *data) {
switch (rc) {
case ZCONNECTIONLOSS:
check_master();
break;
case ZOK:
take_leadership();
break;
case ZNODEEXISTS:
master_exists();
break;
default:
LOG_ERROR(LOGCALLBACK(zh),
"Something went wrong when running for master.");
break;
}
}
Upon connection loss, check whether a master znode has been created by this master or some other master.
If we have been able to create it, then take leadership.
If the master znode already exists (someone else has taken the lock), run a function to watch for the later disappearance of the znode.
If this master finds that /master already exists, it proceeds to set a watch with a call to zoo_awexists:
void master_exists() {
zoo_awexists(zh,
"/master",
master_exists_watcher,
NULL,
master_exists_completion,
NULL);
}
Defines the watcher for /master.
Callback for this exists call.
Note that this call allows us to pass a context to the watcher function as well. Although we do not make use of it in this case, the watcher function allows us to pass a (void *) to some structure or variable that represents the context of this call.
Our implementation of the watcher function that processes the notification when the znode is deleted is the following:
void master_exists_watcher (zhandle_t *zh,
int type,
int state,
const char *path,
void *watcherCtx) {
if( type == ZOO_DELETED_EVENT) {
assert( !strcmp(path, "/master") );
run_for_master();
} else {
LOG_DEBUG(LOGCALLBACK(zh),
"Watched event: ", type2string(type));
}
}
If /master gets deleted, run for master.
Back to the master_exists call. The completion function we implement is simple and follows the pattern we have been using thus far. The one small important detail to note is that between the execution of the call to create /master and the execution of the exists request, it is possible that the /master znode has been deleted (i.e., that the previous primary master has gone away). Consequently, the completion function verifies that the znode exists and, if it does not, the client runs for master again:
void master_exists_completion (int rc,
const struct Stat *stat,
const void *data) {
switch (rc) {
case ZCONNECTIONLOSS:
case ZOPERATIONTIMEOUT:
master_exists();
break;
case ZOK:
if(stat == NULL) {
LOG_INFO(LOGCALLBACK(zh),
"Previous master is gone, running for master");
run_for_master();
}
break;
default:
LOG_WARN(LOGCALLBACK(zh),
"Something went wrong when executing exists: ",
rc2string(rc));
break;
}
}
Checks whether the znode exists by checking whether stat is null.
Runs for master again if the znode is gone.
Once the master determines it is the primary, it takes leadership, as we explain next.
Taking Leadership
Once the master is elected primary, it starts exercising its role. It first gets the list of available workers:
void take_leadership() {
get_workers();
}
void get_workers() {
zoo_awget_children(zh,
"/workers",
workers_watcher,
NULL,
workers_completion,
NULL);
}
Sets a watch to run in case the list of workers changes.
Defines the completion function to be called upon return.
Our implementation caches the list of workers it read last. Upon reading a new list, it replaces the old list. All this happens in the completion function of the zoo_awget_children call:
void workers_completion (int rc,
const struct String_vector *strings,
const void *data) {
switch (rc) {
case ZCONNECTIONLOSS:
case ZOPERATIONTIMEOUT:
get_workers();
break;
case ZOK:
struct String_vector *tmp_workers =
removed_and_set(strings, &workers);
free_vector(tmp_workers);
get_tasks();
break;
default:
LOG_ERROR(LOGCALLBACK(zh),
"Something went wrong when checking workers: %s",
rc2string(rc));
break;
}
}
Updates the list of workers.
We are not really using the list of workers that have been removed in this example, so just free it. The idea is to use it for reassignments, though. Consider doing this as an exercise.
The next step is getting tasks to be assigned.
To get tasks, the server gets the children of /tasks and takes the ones that have been introduced since the last time the list was read. We need to take the difference because otherwise the master might end up assigning the same task twice (assigning twice is possible if we take the list of tasks as is because two consecutive reads of the children of /tasks might return some duplicate elements—for example, if the master does not have enough time to process all elements of the first read):
void get_tasks () {
zoo_awget_children(zh,
"/tasks",
tasks_watcher,
NULL,
tasks_completion,
NULL);
}
void tasks_watcher (zhandle_t *zh,
int type,
int state,
const char *path,
void *watcherCtx) {
if( type == ZOO_CHILD_EVENT) {
assert( !strcmp(path, "/tasks") );
get_tasks();
} else {
LOG_INFO(LOGCALLBACK(zh),
"Watched event: ",
type2string(type));
}
}
void tasks_completion (int rc,
const struct String_vector *strings,
const void *data) {
switch (rc) {
case ZCONNECTIONLOSS:
case ZOPERATIONTIMEOUT:
get_tasks();
break;
case ZOK:
LOG_DEBUG(LOGCALLBACK(zh), "Assigning tasks");
struct String_vector *tmp_tasks = added_and_set(strings, &tasks);
assign_tasks(tmp_tasks);
free_vector(tmp_tasks);
break;
default:
LOG_ERROR(LOGCALLBACK(zh),
"Something went wrong when checking tasks: %s",
rc2string(rc));
break;
}
}
If the list of tasks changes, get the tasks again.
Assign only the tasks that are not being assigned already.
Assigning Tasks
Assigning a task consists of getting the task data, choosing a worker, assigning the task by adding a znode to the list of tasks of the worker, and finally deleting the task from the /tasks znode. These actions are essentially what the following code snippets implement. Getting the task data, assigning the task, and deleting the task are all asynchronous operations and require completion functions. We begin as follows:
void assign_tasks(const struct String_vector *strings) {
int i;
for( i = 0; i < strings->count; i++) {
get_task_data( strings->data[i] );
}
}
void get_task_data(const char *task) {
if(task == NULL) return;
char * tmp_task = strndup(task, 15);
char * path = make_path(2, "/tasks/", tmp_task);
zoo_aget(zh,
path,
0,
get_task_data_completion,
(const void *) tmp_task);
free(path);
}
struct task_info {
char* name;
char *value;
int value_len;
char* worker;
};
void get_task_data_completion(int rc, const char *value, int value_len,
const struct Stat *stat, const void *data) {
int worker_index;
switch (rc) {
case ZCONNECTIONLOSS:
case ZOPERATIONTIMEOUT:
get_task_data((const char *) data);
break;
case ZOK:
if(workers != NULL) {
worker_index = (rand() % workers->count);
struct task_info *new_task;
new_task = (struct task_info*) malloc(sizeof(struct task_info));
new_task->name = (char *) data;
new_task->value = strndup(value, value_len);
new_task->value_len = value_len;
const char * worker_string = workers->data[worker_index];
new_task->worker = strdup(worker_string);
task_assignment(new_task);
}
break;
default:
LOG_ERROR(LOGCALLBACK(zh),
"Something went wrong when checking the master lock: %s",
rc2string(rc));
break;
}
}
For each task, we first get its data.
Asynchronous call to get task data.
Structure to keep the task context.
Choose worker at random to assign the task to.
Create a new task_info instance to store the task data.
Got task data, so let’s complete the assignment.
So far, the code has read the data of the task and selected a worker. The next step is to create the znode to represent the assignment:
void task_assignment(struct task_info *task) {
char* path = make_path(4, "/assign/" , task->worker, "/", task->name);
zoo_acreate(zh,
path,
task->value,
task->value_len,
&ZOO_OPEN_ACL_UNSAFE,
0,
task_assignment_completion,
(const void*) task);
free(path);
}
void task_assignment_completion (int rc, const char *value, const void *data) {
switch (rc) {
case ZCONNECTIONLOSS:
case ZOPERATIONTIMEOUT:
task_assignment((struct task_info*) data);
break;
case ZOK:
if(data != NULL) {
char * del_path = "";
del_path = make_path(2, "/tasks/",
((struct task_info*) data)->name);
if(del_path != NULL) {
delete_pending_task(del_path);
}
free(del_path);
free_task_info((struct task_info*) data);
}
break;
case ZNODEEXISTS:
LOG_DEBUG(LOGCALLBACK(zh),
"Assignment has alreasy been created: %s",
value);
break;
default:
LOG_ERROR(LOGCALLBACK(zh),
"Something went wrong when checking the master lock: %s",
rc2string(rc));
break;
}
}
Create the znode representing the task assignment.
Once the task has been assigned, the master deletes the task from the list of unassigned tasks.
We have allocated space in the heap for the task_info instance, so we can now free it.
The final step is to delete the task from /tasks. Recall that the tasks that haven’t been assigned are kept in /tasks:
void delete_pending_task (const char * path) {
if(path == NULL) return;
char * tmp_path = strdup(path);
zoo_adelete(zh,
tmp_path,
-1,
delete_task_completion,
(const void*) tmp_path);
}
void delete_task_completion(int rc, const void *data) {
switch (rc) {
case ZCONNECTIONLOSS:
case ZOPERATIONTIMEOUT:
delete_pending_task((const char *) data);
break;
case ZOK:
free((char *) data);
break;
default:
LOG_ERROR(LOGCALLBACK(zh),
"Something went wrong when deleting task: %s",
rc2string(rc));
break;
}
}
Asynchronously delete the task.
There isn’t much to do after the task is successfully deleted. Here, we just free the space we allocated previously to the path string.
Single-Threaded versus Multithreaded Clients
The ZooKeeper distribution has two options for the C binding: multithreaded and single-threaded. The multithreaded version is the version we encourage developers to use, whereas the single-threaded version exists mainly for historical reasons. Back at Yahoo!, there were applications that used to run on BSD and were single-threaded. They needed a single-threaded version of the client library to be able to use ZooKeeper. If you’re not in a situation that forces the use of the single-threaded library, then just go with the multithreaded library.
To use the single-threaded version, we can reuse the code shown throughout this chapter, but we need to additionally implement an event loop. For our example, it looks like this:
int initialized = 0;
int run = 0;
fd_set rfds, wfds, efds;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_ZERO(&efds);
while (!is_expired()) {
int fd;
int interest;
int events;
struct timeval tv;
int rc;
zookeeper_interest(zh, &fd, &interest, &tv);
if (fd != -1) {
if (interest&ZOOKEEPER_READ) {
FD_SET(fd, &rfds);
} else {
FD_CLR(fd, &rfds);
}
if (interest&ZOOKEEPER_WRITE) {
FD_SET(fd, &wfds);
} else {
FD_CLR(fd, &wfds);
}
} else {
fd = 0;
}
/*
* Master call to get a ZooKeeper handle.
*/
if(!initialized) {
if(init(argv[1])) {
LOG_ERROR(("Error while initializing the master: ", errno));
}
initialized = 1;
}
/*
* The next if block contains
* calls to bootstrap the master
* and run for master. We only
* get into it when the client
* has established a session and
* is_connected is true.
*/
if(is_connected() && !run) {
LOG_INFO(("Connected, going to bootstrap and run for master"));
/*
* Create parent znodes
*/
bootstrap();
/*
* Run for master
*/
run_for_master();
run =1;
}
rc = select(fd+1, &rfds, &wfds, &efds, &tv);
events = 0;
if (rc > 0) {
if (FD_ISSET(fd, &rfds)) {
events |= ZOOKEEPER_READ;
}
if (FD_ISSET(fd, &wfds)) {
events |= ZOOKEEPER_WRITE;
}
}
zookeeper_process(zh, events);
}
Return the events this client is interested in.
Add ZOOKEEPER_READ events to the set of interests.
Add ZOOKEEPER_WRITE events to the set of interests.
Here we start application calls. This is the same init call shown in Starting a Session to get a ZooKeeper handle.
This runs after the master connects to a ZooKeeper server and receives a ZOO_CONNECTED_EVENT. This block bootstraps and runs for master.
Use the select call to wait for new events.
Indicates that a read event on the file descriptor fd has happened.
Indicates that a write event on the file descriptor fd has happened.
Processes pending ZooKeeper events. zookeeper_process is the same call that the multithreaded library uses to process watch events and completions. In the single-threaded case, we have to do it ourselves.
This event loop takes care of the relevant ZooKeeper events, such as callbacks and session events.
To use the multithreaded version, compile the client application with -l zookeeper_mt and define THREADED with the -DTHREADED option. The code uses the THREADED directive to indicate the part of the code in the main call that is to be executed when it is compiled with the multithreaded library. To use the single-threaded version, compile the program with -l zookeeper_st and without the -DTHREADED option. Generate the library you want to use by following the compilation procedure detailed in the ZooKeeper distribution.
BLOCKING CALLBACKS
If a callback blocks the thread (while it is doing disk I/O, for example), the sessions may time out because the ZooKeeper processing loops do not get the CPU time they need to do processing. This same problem does not occur with the multithreaded library because it uses separate threads for handling I/O and for completion calls.
Takeaway Messages
The C ZooKeeper binding is very popular, and in this chapter we have explored how to develop a ZooKeeper application with it. The flow of the application is not very different from what we have already seen for the Java binding, and the key differences stem mainly from the differences between the languages. For example, here we had to deal with heap management, whereas in Java we pretty much delegate it to the JVM. We also pointed out that ZooKeeper provides the option of implementing multithreaded and single-threaded applications. We strongly encourage developers to go with the multithreaded option, but we do show how to make it single-threaded because this option comes with the distribution.