RESTful Java with JAX-RS 2.0 (2013)
Part II. JAX-RS Workbook
Chapter 27. Examples for Chapter 13
In Chapter 13, you learned how clients can invoke HTTP requests in the background. You also learned how the server side can detach response processing from the original calling thread with an AsyncResponse
. In this chapter, we’ll use both of these features to implement a customer chat service.
Example ex13_1: Chat REST Interface
Before we dive into code, let me explain the REST interface for our chat service. The service will share a URL to both send and receive chat messages. The service will work much like Twitter in that if one user posts a chat, anybody listening for chats will see it. Posting a chat is a simple HTTP POST request. Here’s an example request:
POST
/chat
HTTP/1.1
Host:
localhost:8080
Content-Type:
text/plain
Hello everybody
As you can see, all the user has to do is post a simple text message to the /chat URL and messages will be sent to all listeners.
To receive chat messages, clients will make a blocking GET request to the chat server:
GET
/chat
HTTP/1.1
Host:
localhost:8080
When a chat becomes available, this GET request returns with the next chat message. Additionally, a next
Link
header is sent back with the HTTP response:
HTTP/1.1
200
OK
Content-Type:
text/plain
Link:
</chat?current=1>; rel=next
Hello everybody
We do not want the chat client to lose any messages while it is processing a response. The next
link is a placeholder into the list of messages that are posted to the server. After displaying the chat message, the client will do a new GET request to the server using the URL contained within thenext
Link
header:
GET
/chat?current=1
HTTP/1.1
Host:
localhost:800
The next
link’s URI contains a query parameter identifying to the server the last message the client read. The server will use this index to obtain the next message so that the client sees all messages in order. This new GET request will either block again, or immediately return a queued chat message. The pattern then repeats itself. The response will contain a new next
Link
header with a new pointer into the message queue:
HTTP/1.1
200
OK
Content-Type:
text/plain
Link:
</chat?current=2>; rel=next
What's up?
The server will buffer the latest 10 chat messages in a linked list so that it can easily find the next message a particular chat client needs. This is an example of a HATEOAS flow, where the client transitions its state using a link passed back from the server.
The Client Code
The client is a console program that takes input from the command line while at the same time printing out the current chat message. To run the client, you must specify the name you want to use to post messages as an initial argument when you start up the program.
src/main/java/ChatClient.java
public
class
ChatClient
{
public
static
void
main(String[]
args)
throws
Exception
{
String
name
=
args[0];
...
}
After grabbing the client’s name from the argument list, we then initialize a Client
that we’ll use to invoke on the customer chat service:
final
Client
client
=
new
ResteasyClientBuilder()
.connectionPoolSize(3)
.build();
WebTarget
target
=
client.target("http://localhost:8080/services/chat");
By default, RESTEasy allows only one connection per Client
to be open at one time. So we use the proprietary ClientBuilder
implementation of RESTEasy to set a connection pool size of 3. We also initialize a WebTarget
with the URL of the chat service.
Next, we use the JAX-RS client asynchronous callback API to set up a loop to pull chat messages from the server:
target.request().async().get(new
InvocationCallback<Response>()
{
@Override
public
void
completed(Response
response)
{
Link
next
=
response.getLink("next");
String
message
=
response.readEntity(String.class);
System.out.println();
System.out.print(message);// + "\r");
System.out.println();
System.out.print("> ");
client.target(next).request().async().get(this);
}
@Override
public
void
failed(Throwable
throwable)
{
System.err.println("FAILURE!");
}
});
The code starts off by making an async request to the base chat URI. This invocation registers an InvocationCallback
interface that we’ve implemented as a Java inner class. When the initial GET request is complete, the InvocationCallback.complete()
method is invoked, passing in the Response
from the server. We first extract the next Link
header and the chat message from the Response
. We then print the message to the console. Finally, we make a new asynchronous GET request using the URI contained in the next Link
header. We register the currentInvocationCallback
instance with this new request. This will set up a continuous pull request with the chat service.
After we’ve set up our receive loop, we set up another loop that allows us to send chat messages:
while
(true)
{
System.out.print("> ");
BufferedReader
br
=
new
BufferedReader(
new
InputStreamReader(System.in));
String
message
=
br.readLine();
target.request().post(Entity.text(name
+
": "
+
message));
}
We simply read from stdin
until the user hits Enter and then do an HTTP POST request to the chat service with the command-line input.
The Server Code
The server side is doing a lot of different things to implement our chat service. Let’s break it down:
src/main/java/com/restfully/shop/services/CustomerChat.java
@Path("chat")
public
class
CustomerChat
{
class
Message
{
String
id;
String
message;
Message
next;
}
The CustomerChat
class is annotated with @Path
to specify the root resource path of our JAX-RS service. It then declares a simple inner class called Message
that will represent the queued chat messages. A message is represented by a String id
and a String message
, and also contains a reference to the next queued Message
.
protected
Message
first;
protected
Message
last;
The service remembers what the current first and last message is. It stores these in the first
and last
member variables of the class.
protected
int
maxMessages
=
10;
protected
LinkedHashMap<String,
Message>
messages
=
new
LinkedHashMap<String,
Message>()
{
@Override
protected
boolean
removeEldestEntry(Map.Entry<String,
Message>
eldest)
{
boolean
remove
=
size()
>
maxMessages;
if
(remove)
first
=
eldest.getValue().next;
return
remove;
}
};
Message
objects are stored in a java.util.LinkedHashMap
so that they can be easily looked up when a chat client makes a GET request. The key of this map is the id
of the Message
. The service will always queue the last 10 messages posted to the server. We use a LinkedHashMap
so that we can easily evict the oldest chat message when the maximum number of buffered messages is reached. The removeEldestEntry()
method is used to determine when to evict the oldest entry in the map. It simply checks to see if the size of the map is greater than the maximum amount of messages. It then resets what the first
message is. Returning true
triggers the removal of the eldest entry.
protected
AtomicLong
counter
=
new
AtomicLong(0);
The AtomicLong counter
variable is used to generate message IDs.
LinkedList<AsyncResponse>
listeners
=
new
LinkedList<AsyncResponse>();
The listeners
variable stores a list of waiting chat clients. We’ll see how this is used later.
ExecutorService
writer
=
Executors.newSingleThreadExecutor();
We will have one and only one thread that is responsible for writing response messages back to the chat clients. Having one writer thread is what makes this whole application scale very well. Without asynchronous JAX-RS, this service would require a thread per blocking chat client. While most modern operating systems can handle one or two thousand threads, system performance starts to degrade quickly with all the context switching the operating system has to do. Asynchronous JAX-RS allows us to scale to a much larger number of concurrent users.
Posting a new message
Let’s look at how the service handles a new chat message:
@Context
protected
UriInfo
uriInfo;
@POST
@Consumes("text/plain")
public
void
post(final
String
text)
{
final
UriBuilder
base
=
uriInfo.getBaseUriBuilder();
writer.submit(new
Runnable());
The post()
method consumes plain-text data. The first thing we do is store a UriBuilder
in a local variable of the base URI of our service. We then queue up a task for our writer thread. We cannot use the UriInfo
member variable in this background task. The CustomerChat
class is a singleton and can accept requests concurrently. Because of this, the UriInfo uriInfo
member variable is a proxy that delegates to the request’s actual UriInfo
by using an underlying ThreadLocal
in most JAX-RS vendor implementations. If the writer background thread invokes on this proxy, it would get an error because ThreadLocal
data is not transferred between different threads.
writer.submit(new
Runnable());
{
@Override
public
void
run()
{
synchronized
(messages)
{
Message
message
=
new
Message();
message.id
=
Long.toString(counter.incrementAndGet());
message.message
=
text;
Each new message post is queued up for the writer thread in an implementation of the Runnable
interface. This task starts off by synchronizing on the messages
variable. This protects the critical parts of our message service by serializing access to the messages
map. The code then creates a Message
instance using an id
generated from the counter
.
if
(messages.size()
==
0)
{
first
=
message;
}
else
{
last.next
=
message;
}
The writer thread next checks to see if this is the initial message to the system. If so, it sets the first
member variable to point to the first message posted to the service. Otherwise, it points the tail of the Message
linked list to this new Message
instance.
messages.put(message.id,
message);
last
=
message;
The code then stores the new message in the messages
map and sets the last
member variable to point to this new message.
for
(AsyncResponse
async
:
listeners)
{
try
{
send(base,
async,
message);
}
catch
(Exception
e)
{
e.printStackTrace();
}
}
listeners.clear();
}
}
});
}
Finally, the writer thread loops through all waiting chat clients and sends them the new message.
Handling poll requests
The CustomerChat.receive()
method handles GET requests from chat clients:
@GET
public
void
receive(@QueryParam("current")
String
id,
@Suspended
AsyncResponse
async)
{
final
UriBuilder
base
=
uriInfo.getBaseUriBuilder();
Message
message
=
null;
synchronized
(messages)
{
Message
current
=
messages.get(id);
if
(current
==
null)
message
=
first;
else
message
=
current.next;
if
(message
==
null)
{
queue(async);
}
}
// do this outside of synchronized block to reduce lock hold time
if
(message
!=
null)
send(base,
async,
message);
}
The receive()
method takes a query parameter, current
. This parameter is the id
of the last message the chat client read. This parameter is allowed to be null
if this is the chat client’s first pull request. Injecting the async
parameter via the @Suspended
annotation detaches HTTP response processing from this request thread.
The method then begins by defining a synchronized
block on the messages
variable. This block allows the receive()
method to perform atomic actions that do not conflict with the writer thread. Within the block, the code looks up the current
query parameter in the messages
map. If the message is null
, then the code sets this variable to the first
member variable of the class. Otherwise, it sets the message to the found message’s next
field. If the message is still null
, then there is no message available and the AsyncResponse
is queued for the writer thread to pick up when a message is available.
Finally, after the synchronized
block, if the message is not null
, it is sent immediately back to the chat client.
protected
void
queue(AsyncResponse
async)
{
listeners.add(async);
}
The queue()
method just adds the AsyncResponse
to the listeners
list so the writer thread can pick it up.
protected
void
send(UriBuilder
base,
AsyncResponse
async,
Message
message)
{
URI
nextUri
=
base.clone().path(CustomerChat.class)
.queryParam("current",
message.id).build();
Link
next
=
Link.fromUri(nextUri).rel("next").build();
Response
response
=
Response.ok(message.message,
MediaType.TEXT_PLAIN_TYPE)
.links(next).build();
async.resume(response);
}
The send()
method can be called by the writer thread or the receive()
method. It creates a Response
populated with the message that will be sent back to the chat client. It also calculates and adds a next Link
header to send back with the response. At the end of the method, theAsyncResponse.resume()
method is invoked with the built Response
.
Build and Run the Example Program
You’ll need multiple console windows to run this example. In the first console window, perform the following steps:
1. Change to the ex13_1 directory of the workbook example code.
2. Make sure your PATH is set up to include both the JDK and Maven, as described in Chapter 17.
3. Perform the build and run the example by typing maven jetty:run
.
This will start the JAX-RS services for the example.
Open another console window and do the following.
1. Change to the ex13_1 directory of the workbook example code.
2. Run the chat client by typing maven exec:java -Dexec.mainClass=ChatClient -Dexec.args="your-name"
.
Replace your-name
with your first name. Repeat this process in yet another console window to run a second chat client. Finally, start typing chat messages.