Asynchronous JAX-RS - REST and the JAX-RS Standard - RESTful Java with JAX-RS 2.0 (2013)

RESTful Java with JAX-RS 2.0 (2013)

Part I. REST and the JAX-RS Standard

Chapter 13. Asynchronous JAX-RS

Another interesting new feature introduced in JAX-RS 2.0 is asynchronous request and response processing both on the client and server side. If you are mashing together a lot of data from different websites or you have something like a stock quote application that needs to push events to hundreds or thousands of idle blocking clients, then the JAX-RS 2.0 asynchronous APIs are worth looking into.

AsyncInvoker Client API

The client asynchronous API allows you to spin off a bunch of HTTP requests in the background and then either poll for a response, or register a callback that is invoked when the HTTP response is available. To invoke an HTTP request asynchronously on the client, you interact with thejavax.ws.rs.client.AsyncInvoker interface or the submit() methods on javax.ws.rs.client.Invocation. First, let’s take a look at polling HTTP requests that are run in the background.

Using Futures

The AsyncInvoker interface has a bunch of methods that invoke HTTP requests asynchronously and that return a java.util.concurrent.Future instance. You can use the AsyncInvoker methods by invoking the async() method on the Invocation.Builder interface.

package javax.ws.rs.client;

public interface AsyncInvoker {

Future<Response> get();

<T> Future<T> get(Class<T> responseType);

Future<Response> put(Entity<?> entity);

<T> Future<T> put(Entity<?> entity, Class<T> responseType);

Future<Response> post(Entity<?> entity);

<T> Future<T> post(Entity<?> entity, Class<T> responseType);

Future<Response> delete(Entity<?> entity);

<T> Future<T> delete(Entity<?> entity, Class<T> responseType);

...

}

The Future interface is defined within the java.util.concurrent package that comes with the JDK. For JAX-RS, it gives us a nice reusable interface for polling HTTP responses in either a blocking or nonblocking manner. If you’ve used java.util.concurrent.Executors or@Asynchronous within an EJB container, using the Future interface should be very familiar to you.

package java.util.concurrent;

public interface Future<V> {

boolean cancel(boolean mayInterruptIfRunning);

boolean isCancelled();

boolean isDone();

V get() throws InterruptedException, ExecutionException;

V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

}

This is best explained in a full example:

Client client = ClientBuilder.newClient();

Future<Response> future1 = client.target("http://example.com/customers/123")

.request()

.async().get();

Future<Order> future2 = client.target("http://foobar.com/orders/456")

.request()

.async().get(Order.class);

// block until complete

Response res1 = future1.get();

Customer result1 = res.readEntity(Customer.class);

// Wait 5 seconds

try {

Order result2 = future2.get(5, TimeUnit.SECONDS);

} catch (TimeoutException timeout ) {

... handle exception ...

}

In this example, two separate requests are executed in parallel. With future1 we want a full javax.ws.rs.core.Response. After executing both requests, we poll and block indefinitely on future1 by calling Future.get() until we get a Response back from that service.

With future2, we instead poll and block for five seconds only. For this second HTTP asynchronous request, we let JAX-RS automatically unmarshal the HTTP response body into an Order. java.util.concurrent.TimeoutException is thrown if the call takes longer than five seconds. You can also invoke the nonblocking isDone() or isCancelled() methods on Future to see if the request is finished or cancelled.

Exception handling

Exceptions that can be thrown by Future.get() methods are defined by that interface. java.util.concurrent.TimeoutException occurs if we are calling Future.get() with a timeout. InterruptedException happens if the calling thread has been interrupted.java.util.concurrent.ExecutionException is a wrapper exception. Any exception thrown by the JAX-RS runtime is caught and wrapped by this exception. Let’s expand on the future2 example to see how this works:

// Wait 5 seconds

try {

Order result2 = future2.get(5, TimeUnit.SECONDS);

} catch (TimeoutException timeout ) {

System.err.println("request timed out");

} catch (InterruptedException ie) {

System.err.println("Request was interrupted");

} catch (ExecutionException ee) {

Throwable cause = ee.getCause();

if (cause instanceof WebApplicationException) {

(WebApplicationException)wae = (WebApplicationException)cause;

wae.close();

} else if (cause instanceof ResponseProcessingException) {

ResponseProcessingException rpe = (ResponseProcessingException)cause;

rpe.close();

} else if (cause instanceof ProcessingException) {

// handle processing exception

} else {

// unknown

}

}

You can obtain any exception thrown by the JAX-RS runtime when an asynchronous request is executed by calling the ExecutionException.getCause() method. The possible wrapped JAX-RS exceptions are the same as the synchronous ones discussed in Chapter 8.

In the example, the call to future2.get() unmarshalls the response to an Order object. If the response is something other than 200, “OK,” then the JAX-RS runtime throws one of the exceptions from the JAX-RS error exception hierarchy (i.e., NotFoundException orBadRequestException). If an exception is thrown while you’re unmarshalling the response to a Order, then ResponseProcessingException is thrown.

WARNING

You should always make sure that the underlying JAX-RS response is closed. While most JAX-RS containers will have their Response objects implement a finalize() method, it is not a good idea to rely on the garbage collector to clean up your client connections. If you do not clean up your connections, you may end up with intermittent errors that pop up if the underlying Client or operating system has exhausted its limit of allowable open connections.

In fact, if we examine our initial example a bit further, there’s a lot of code we have to add to ensure that we are being good citizens and closing any open Response objects. Here’s what the final piece of code would look like:

Client client = ClientBuilder.newClient();

Future<Response> future1 = client.target("http://example.com/service")

.request()

.async().get();

Future<Order> future2 = null;

try {

future2 = client.target("http://foobar.com/service2")

.request()

.async().get(Order.class);

} catch (Throwable ignored) {

ignored.printStackTrace();

}

// block until complete

Response res1 = future1.get();

try {

Customer result1 = res.readEntity(Customer.class);

} catch (Throwable ignored) {

ignored.printStackTrace();

} finally {

res1.close();

}

// if we successfully executed 2nd request

if (future2 != null) {

// Wait 5 seconds

try {

Order result2 = future2.get(5, TimeUnit.SECONDS);

} catch (TimeoutException timeout ) {

System.err.println("request timed out");

} catch (InterruptedException ie) {

System.err.println("Request was interrupted");

} catch (ExecutionException ee) {

Throwable cause = ee.getCause();

if (cause instanceof WebApplicationException) {

(WebApplicationException)wae = (WebApplicationException)cause;

wae.close();

} else if (cause instanceof ResponseProcessingException) {

ResponseProcessingException rpe = (ResponseProcessingException)cause;

rpe.close();

} else if (cause instanceof ProcessingException) {

// handle processing exception

} else {

// unknown

}

}

}

As you can see, there’s a few more try/catch blocks we need to add to make sure that the response of each async request is closed.

Using Callbacks

The AsyncInvoker interface has an additional callback invocation style. You can register an object that will be called back when the asynchronous invocation is ready for processing:

package javax.ws.rs.client;

public interface AsyncInvoker {

<T> Future<T> get(InvocationCallback<T> callback);

<T> Future<T> post(Entity<?> entity, InvocationCallback<T> callback);

<T> Future<T> put(Entity<?> entity, InvocationCallback<T> callback);

<T> Future<T> delete(Entity<?> entity, InvocationCallback<T> callback);

...

}

The InvocationCallback interface is a parameterized generic interface and has two simple methods you have to implement—one for successful responses, the other for failures:

package javax.rs.ws.client;

public interface InvocationCallback<RESPONSE> {

public void completed(RESPONSE response);

public void failed(Throwable throwable);

}

JAX-RS introspects the application class that implements InvocationCallback to determine whether your callback wants a Response object or if you want to unmarshal to a specific type. Let’s convert our Future example to use callbacks. First, we’ll implement a callback for our initial request:

public class CustomerCallback implements InvocationCallback<Response> {

public void completed(Response response) {

if (response.getStatus() == 200) {

Customer cust = response.readEntity(Customer.class);

} else {

System.err.println("Request error: " + response.getStatus());

}

}

public void failed(Throwable throwable) {

throwable.printStackTrace();

}

}

The CustomerCallback class implements InvocationCallback with a Response generic parameter. This means JAX-RS will call the completed() method and pass in an untouched Response object. If there is a problem sending the request to the server or the JAX-RS runtime is unable to create a Response, the failed() method will be invoked with the appropriate exception. Otherwise, if there is an HTTP response, then completed() will be called.

Next, let’s implement a different callback for our second parallel request. This time we want our successful HTTP responses to be converted into Order objects:

public class OrderCallback implements InvocationCallback<Order> {

public void completed(Order order) {

System.out.println("We received an order.");

}

public void failed(Throwable throwable) {

if (throwable instanceof WebApplicationException) {

WebApplicationException wae = (WebApplicationException)throwable;

System.err.println("Failed with status:

" + wae.getResponse().getStatus());

} else if (throwable instanceof ResponseProcessingException) {

ResponseProcessingException rpe = (ResponseProcessingException)cause;

System.err.println("Failed with status:

" + rpe.getResponse().getStatus());

} else {

throwable.printStackTrace();

}

}

}

This case is a little bit different than when we implement InvocationCallback with a Response. If there is a successful HTTP response from the server (like 200, “OK”), JAX-RS will attempt to unmarshal the response into an Order object. If there were an HTTP error response, or the JAX-RS runtime failed to unmarshal the response body into an Order object, then the failed() method is invoked with the appropriate exception. Basically, we see the same kind of exceptions thrown by similar synchronous invocations or the Future example we discussed earlier. You do not have to close the underlying Response object; JAX-RS will do this after completed() or failed() is invoked.

Now that our callback classes have been implemented, let’s finish our example by invoking on some services:

Client client = ClientBuilder.newClient();

Future<Response> future1 = client.target("http://example.com/customers/123")

.request()

.async().get(new CustomerCallback());

Future<Order> future2 = client.target("http://foobar.com/orders/456")

.request()

.async().get(new OrderCallback());

That’s all we have to do. Notice that the get() methods return a Future object. You can ignore this Future, or interact with it as we did previously. I suggest that you only use the Future.cancel() and Future.isDone() methods, though, as you may have concurrency issues withInvocationCallback.

Futures Versus Callbacks

Given that we have two different ways to do asynchronous client invocations, which style should you use? Futures or callbacks? In general, use futures if you need to join a set of requests you’ve invoked asynchronously. By join, I mean you need to know when each of the requests has finished and you need to perform another task after all the asynchronous requests are complete. For example, maybe you are gathering information from a bunch of different web services to build a larger aggregated document (a mashup).

Use callbacks when each invocation is its own distinct unit and you do not have to do any coordination or mashing up.

Server Asynchronous Response Processing

For a typical HTTP server, when a request comes in, one thread is dedicated to the processing of that request and its HTTP response to the client. This is fine for the vast majority of HTTP traffic both on the Internet and on your company’s internal networks. Most HTTP requests are short-lived, so a few hundred threads can easily handle a few thousand concurrent users and have relatively decent response times.

The nature of HTTP traffic started to change somewhat as JavaScript clients started to become more prevalent. One problem that popped up often was the need for the server to push events to the client. A typical example is a stock quote application where you need to update a string of clients with the latest stock price. These clients would make an HTTP GET or POST request and just block indefinitely until the server was ready to send back a response. This resulted in a large amount of open, long-running requests that were doing nothing other than idling. Not only were there a lot of open, idle sockets, but there were also a lot of dedicated threads doing nothing at all. Most HTTP servers were designed for short-lived requests with the assumption that one thread could process requests from multiple concurrent users. When you have a very large number of threads, you start to consume a lot of operating system resources. Each thread consumes memory, and context switching between threads starts to get quite expensive when the OS has to deal with a large number of threads. It became really hard to scale these types of server-push applications since the Servlet API, and by association JAX-RS, was a “one thread per connection” model.

In 2009, the Servlet 3.0 specification introduced asynchronous HTTP. With the Servlet 3.0 API, you can suspend the current server-side request and have a separate thread, other than the calling thread, handle sending back a response to the client. For a server-push app, you could then have a small handful of threads manage sending responses back to polling clients and avoid all the overhead of the “one thread per connection” model. JAX-RS 2.0 introduced a similar API that we’ll discuss in this section.

NOTE

Server-side async response processing is only meant for a specific small subset of applications. Asynchronous doesn’t necessarily mean automatic scalability. For the typical web app, using server asynchronous response processing will only complicate your code and make it harder to maintain. It may even hurt performance.

AsyncResponse API

To use server-side async response processing, you interact with the AsyncResponse interface:

package javax.ws.rs.container;

public interface AsyncResponse {

boolean resume(Object response);

boolean resume(Throwable response);

...

}

You get access to an AsyncResponse instance by injecting it into a JAX-RS method using the @Suspended annotation:

import javax.ws.rs.container.AsyncResponse;

import javax.ws.rs.container.Suspended;

@Path("/orders")

public class OrderResource {

@POST

@Consumes("application/json")

public void submit(final Order order,

final @Suspended AsyncResponse response) {

}

}

Here we have our very familiar OrderResource. Order submission has been turned into an asynchronous operation. When you inject an instance of AsyncResponse using the @Suspended annotation, the HTTP request becomes suspended from the current thread of execution. In this particular example, the OrderResource.submit() method will never send back a response to the client. The client will just time out with an error condition. Let’s expand on this example:

import javax.ws.rs.container.AsyncResponse;

import javax.ws.rs.container.Suspended;

@Path("/orders")

public class OrderResource {

@POST

@Consumes("application/json")

@Produces("application/json")

public void submit(final Order order,

final @Suspended AsyncResponse response) {

new Thread() {

public void run() {

OrderConfirmation confirmation = orderProcessor.process(order);

response.resume(order);

}

}.start();

}

}

In the previous example, the client would just time out. Now, the OrderResource.submit() method spawns a new thread to handle order submission. This background thread processes the Order to obtain an OrderConfirmation. It then sends a response back to the client by calling theAsyncResponse.resume() method, passing in the OrderConfirmation instance. Invoking resume() in this manner means that it is a successful response. So, a status code of 200 is sent back to the client. Also, because we’re passing a Java object, the resume() method will marshal this object and send it within the HTTP response body. The media type used is determined by the @Produces annotation placed on the original JAX-RS method. If the @Produces annotation has more than one value, then the request’s Accept header is examined to pick the returned media type. Basically, this is the same algorithm a regular JAX-RS method uses to determine the media type.

Alternatively, you can pass resume() a Response object to send the client a more specific response:

import javax.ws.rs.container.AsyncResponse;

import javax.ws.rs.container.Suspended;

@Path("/orders")

public class OrderResource {

@POST

@Consumes("application/json")

public void submit(final Order order,

final @Suspended AsyncResponse response) {

new Thread() {

public void run() {

OrderConfirmation confirmation = orderProcessor.process(order);

Response response = Response.ok(confirmation,

MediaType.APPLICATION_XML_TYPE)

.build();

response.resume(response);

}

}.start();

}

}

In this example, we’ve manually created a Response. We set the entity to the OrderConfirmation and the content type to XML.

Exception Handling

In Chapter 7, we discussed what happens when a JAX-RS method throws an exception. When you invoke AsyncResponse.resume(Object), the response filter and interceptor chains (see Chapter 12) are invoked, and then finally the MessageBodyWriter. If an exception is thrown by any one of these components, then the exception is handled in the same way as its synchronous counterpart with one caveat. Unhandled exceptions are not propagated, but instead the server will return a 500, “Internal Server Error,” back to the client.

Finally, the previous example is pretty simple, but what if it were possible for orderProcessor.process() to throw an exception? We can handle this exception by using the AsyncResponse.resume(Throwable) method:

import javax.ws.rs.container.AsyncResponse;

import javax.ws.rs.container.Suspended;

@Path("/orders")

public class OrderResource {

@POST

@Consumes("application/json")

public void submit(final Order order,

final @Suspended AsyncResponse response) {

new Thread() {

public void run() {

OrderConfirmation confirmation = null;

try {

confirmation = orderProcessor.process(order);

} catch (Exception ex) {

response.resume(ex);

return;

}

Response response = Response.ok(confirmation,

MediaType.APPLICATION_XML_TYPE)

.build();

response.resume(response);

}

}.start();

}

}

Invoking AsyncResponse.resume(Throwable) is like throwing an exception from a regular synchronous JAX-RS method. Standard JAX-RS exception handling is performed on the passed-in Throwable. If a matching ExceptionMapper exists for the passed-in Throwable, it will be used. Otherwise, the server will send back a 500 status code.

Cancel

There’s a few other convenience methods on AsyncResponse we haven’t covered yet:

package javax.ws.rs.container;

public interface AsyncResponse {

boolean cancel();

boolean cancel(int retryAfter);

boolean cancel(Date retryAfter);

...

}

Each of the cancel() methods is really a precanned call to resume():

// cancel()

response.resume(Response.status(503).build());

// cancel(int)

response.resume(Response.status(503)

.header(HttpHeaders.RETRY_AFTER, 100)

.build());

// cancel(Date)

response.resume(Response.status(503)

.header(HttpHeaders.RETRY_AFTER, date)

.build());

Internally, a Response object is built with a 503 status code. For cancel() methods that accept input, the parameter is used to initialize a Retry-After HTTP response header.

Status Methods

There’s a few status methods on AsyncResponse that specify the state of the response:

public interface AsyncResponse {

boolean isSuspended();

boolean isCancelled();

boolean isDone();

...

}

The AsyncResponse.isCancelled() method can be called to see if a AsyncResponse has been cancelled. isSuspended() specifies whether or not the response can have resume() or cancel() invoked. The isDone() method tells you if the response is finished.

Timeouts

If an AsyncResponse is not resumed or cancelled, it will eventually time out. The default timeout is container-specific. A timeout results in a 503, “Service Unavailable,” response code sent back to the client. You can explicitly set the timeout by invoking the setTimeout() method:

response.setTimeout(5, TimeUnit.SECONDS);

You can also register a callback that is triggered when a timeout occurs by implementing the TimeoutHandler interface. For example:

response.setTimeoutHandler(

new TimeoutHandler {

void handleTimeout(AsyncResponse response) {

response.resume(Response.serverError().build());

}

}

);

Here, instead of sending the default 503 response code to the client on a timeout, the example registers a TimeoutHandler that sends a 500 response code instead.

Callbacks

The AsyncResponse interface also allows you to register callback objects for other types of events:

package javax.ws.rs.container;

public interface CompletionCallback {

public void onComplete(Throwable throwable);

}

CompletionCallback.onComplete() is called after the response has been sent to the client. The Throwable is set to any unmapped exception thrown internally when processing a resume(). Otherwise, it is null.

package javax.ws.rs.container;

public interface ConnectionCallback {

public void onDisconnect(AsyncResponse response);

}

The JAX-RS container does not require implementation of the ConnectionCallback. It allows you to be notified if the socket connection is disconnected while processing the response.

You enable callbacks by invoking the AsyncResponse.register() methods. You can pass one or more classes that will be instantiated by the JAX-RS container, and you can pass one or more instances:

response.register(MyCompletionCallback.class);

response.register(new MyConnectionCallback());

Callbacks are generally used to receive notification of error conditions caused after invoking resume(). You may have resources to clean up or even transactions to roll back or undo as a result of an asynchronous failure.

Use Cases for AsyncResponse

The examples used in the previous section were really contrived to make it simple to explain the behavior of the asynchronous APIs. As I mentioned before, there is a specific set of use cases for async response processing. Let’s go over it.

Server-side push

With server-side push, the server is sending events back to the client. A typical example is stock quotes. The client wants to be notified when a new quote is available. It does a long-poll GET request until the quote is ready.

Client client = ClientBuilder.newClient();

final WebTarget target = client.target("http://quote.com/quote/RHT");

target.request().async().get(new InvocationCallback<String> {

public void completed(String quote) {

System.out.println("RHT: " + quote);

target.request().async().get(this);

}

public void failed(Throwable t) {}

}

The preceding continuously polls for a quote using InvocationCallback. On the server side, we want our JAX-RS resource classes to use suspended requests so that we can have one thread that writes quotes back to polling clients. With one writer thread, we can scale this quote service to thousands and thousands of clients, as we’re not beholden to a “one thread per request” model. Here’s what the JAX-RS resource class might look like:

@Path("quote/RHT")

public class RHTQuoteResource {

protected List<AsyncResponse> responses;

@GET

@Produces("text/plain")

public void getQuote(@Suspended AsyncResponse response) {

synchronized (responses) {

responses.put(response);

}

}

}

The example code is overly simplified, but the idea is that there is a List of AsyncResponse objects that are waiting for the latest stock quote for Red Hat. This List would be shared by a background thread that would send a response back to all waiting clients when a new quote for Red Hat became available.

Executor executor = Executors.newSingleThreadExecutor();

final List<AsyncResponse> responses = ...;

final Ticker rhtTicker = nyse.getTicker("RHT");

executor.execute(new Runnable() {

public void run() {

while (true) {

String quote = ticker.await();

synchronized (responses) {

for (AsyncResponse response : responses) response.resume(quote);

}

}

}

});

So, here we’re starting a background thread that runs continuously using the Executors class from the java.util.concurrent package that comes with the JDK. This thread blocks until a quote for Red Hat is available. Then it loops over every awaiting AsyncResponse to send the quote back to each client. Some of the implementation is missing here, but hopefully you get the idea.

Publish and subscribe

Another great use case for AsyncResponse is publish and subscribe applications, an example being a chat service. Here’s what the server code might look like:

@Path("chat")

public class ChatResource {

protected List<AsyncResponse> responses = new ArrayList<AsyncResponse>();

@GET

@Produces("text/plain")

public synchronized void receive(@Suspended AsyncResponse response) {

responses.add(response);

}

@POST

@Consume("text/plain")

public synchronized void send(String message) {

for (AsyncResponse response : responses) {

response.resume(message);

}

}

}

This is a really poor chat implementation, as messages could be lost for clients that are repolling, but hopefully it illustrates how you might create such an application. In Chapter 27, we’ll implement a more robust and complete chat service.

NOTE

With protocols like WebSocket[13] and Server Sent Events (SSE)[14] being supported in most browsers, pure HTTP server push and pub-sub are fast becoming legacy. So, if you’re only going to have browser clients for this kind of app, you’re probably better off using WebSockets or SSE.

Priority scheduling

Sometimes there are certain services that are highly CPU-intensive. If you have too many of these types of requests running, you can completely starve users who are making simple, fast requests. To resolve this issue, you can queue up these expensive requests in a separate thread pool that guarantees that only a few of these expensive operations will happen concurrently:

@Path("orders")

public class OrderResource {

Executor executor;

public OrderResource {

executor = Executors.newSingleThreadExecutor();

}

@POST

@Path("year_to_date_report")

@Produces("application/json")

public void ytdReport(final @FormParam("product") String product,

@AsyncResponse response) {

executor.execute( new Runnable() {

public void run() {

Report report = generateYTDReportFor(product);

response.resume(report);

}

}

}

protected Report generateYTDReportFor(String product) {

...

}

}

Here we’re back to our familiar OrderResource again. We have a ytdReport() method that calculates buying patterns for a specific product for the year to date. We want to allow only one of these requests to execute at a time, as the calculation is extremely expensive. We set up a single-threaded java.util.concurrent.Executor in the OrderResource constructor. The ytdReport() method queues up a Runnable in this Executor that generates the report and sends it back to the client. If the Executor is currently busy generating a report, the request is queued up and executed after that report is finished.

Wrapping Up

In this chapter, we discussed how you can use JAX-RS asynchronously both on the client and server side. On the client, you can execute one or more requests in the background and either poll for their response, or receive a callback. On the server, we saw that you can suspend requests so that a different thread can handle response processing. This is a great way to scale specific kinds of applications. Chapter 27 walks you through a bunch of code examples that show most of these features in action.


[13] For more information, see http://www.websocket.org.

[14] For more information, see http://www.w3.org/TR/2011/WD-eventsource-20110208.