Concurrency - Java 8 Recipes, 2th Edition (2014)

Java 8 Recipes, 2th Edition (2014)

CHAPTER 10. Concurrency

Concurrency is one of the toughest topics to handle in modern computer programming; understanding concurrency requires the capacity of thinking abstractly, and debugging concurrent problems is like trying to pilot an airplane by dead reckoning. Even so, with modern releases of Java, it has become easier (and more accessible) to write bug-free concurrent code.

Concurrency is the ability of a program to execute different (or the same) instructions at the same time. A program that is said to be concurrent has the ability to be split up and run on multiple CPUs. By making concurrent programs, you take advantage of today’s multicore CPUs. You can even see benefit on single-core CPUs that are I/O intensive.

In this chapter, we present the most common need for concurrency tasks—from running a background task to splitting a computation into work units. Throughout the chapter, you will find the most up-to-date recipes for accomplishing concurrency in Java 8.

10-1. Starting a Background Task

Problem

You have a task that needs to run outside of your main thread.

Solution

Create a class implementation that includes the task that needs to be run in a different thread. Implement a Runnable interface in the task implementation class and start a new Thread. In the following example, a counter is used to simulate activity, as a separate task is run in the background.

Image Note The code in this example could be refactored to utilize method references (see Chapter 6), rather than creating an inner class for the new Thread implementation. However for clarity, the anonymous inner class has been shown.

private void someMethod() {
Thread backgroundThread = new Thread(new Runnable() {
public void run() {
doSomethingInBackground();
}
},"Background Thread");

System.out.println("Start");
backgroundThread.start();
for (int i= 0;i < 10;i++) {
System.out.println(Thread.currentThread().getName()+": is counting "+i);
}

System.out.println("Done");
}

private void doSomethingInBackground() {
System.out.println(Thread.currentThread().getName()+
": is Running in the background");
}

If the code is executed more than once, the output should be different from time to time. The background thread will execute separately, so its message is printed at a different time across each run.

The same code for creating the background thread can be written as follows if you’re using lambda expressions:

Thread backgroundThread = new Thread(this::doSomethingInBackground, "Background Thread");

How It Works

The Thread class allows executing code in a new thread (path of execution), distinct from the current thread. The Thread constructor requires as a parameter a class that implements the Runnable interface. The Runnable interface requires the implementation of only one method:public void run(). Hence, it is a functional interface, which facilitates the use of lambda expressions. When the Thread.start() method is invoked, it will in turn create the new thread and invoke the run() method of the Runnable.

Within the JVM are two types of threads: User and Daemon. User threads keep executing until their run() method completes, whereas Daemon threads can be terminated if the application needs to exit. An application exits if there are only Daemon threads running in the JVM. When you start to create multithreaded applications, you must be aware of these differences and understand when to use each type of thread.

Usually, Daemon threads will have a Runnable interface that doesn’t complete; for example a while (true) loop. This allows these threads to periodically check or perform a certain condition throughout the life of the program, and be discarded when the program is finished executing. In contrast, User threads, while alive, will execute and prevent the program from terminating. If you happen to have a program that is not closing and/or exiting when expected, you might want to check the threads that are actively running.

To set a thread as a Daemon thread, use thread.setDaemon(true) before calling the thread.start() method. By default, Thread instances are created as User thread types.

Image Note This recipe shows the simplest way to create and execute a new thread. The new thread created is a User thread, which means that the application will not exit until both the main thread and the background thread are done executing.

10-2. Updating (and Iterating) a Map

Problem

You need to update a Map object from multiple threads, and you want to make sure that the update doesn’t break the contents of the Map object and that the Map object is always in a consistent state. You also want to traverse (look at) the content of the Map object while other threads are updating the Map object.

Solution

Use a ConcurrentMap to update Map entries. The following example creates 1,000 threads. Each thread then tries to modify the Map at the same time. The main thread waits for a second, and then proceeds to iterate through the Map (even when the other threads are still modifying theMap):

Set<Thread> updateThreads = new HashSet<>();

private void start() {
ConcurrentMap<Integer,String> concurrentMap = new ConcurrentHashMap<>();
for (int i =0;i < 1000;i++) {
startUpdateThread(i, concurrentMap);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
concurrentMap.entrySet().stream().forEach((entry) -> {
System.out.println("Key :"+entry.getKey()+" Value:"+entry.getValue());
});

updateThreads.stream().forEach((thread) -> {
thread.interrupt();
});
}

Random random = new Random();
private void startUpdateThread(int i, final ConcurrentMap<Integer, String> concurrentMap) {
Thread thread = new Thread(() -> {
while (!Thread.interrupted()) {
int randomInt = random.nextInt(20);
concurrentMap.put(randomInt, UUID.randomUUID().toString());
}
});
thread.setName("Update Thread "+i);
updateThreads.add(thread);
thread.start();
}

How It Works

For performing work on a HashTable in a concurrent manner, ConcurrentHashMap allows multiple threads to modify the HashTable concurrently and safely. ConcurrentHashMap is a HashTable supporting full concurrency for retrievals, and high concurrency for updates. In the example, 1,000 threads make modifications to the Map over a short period of time. The ConcurrentHashMap iterator, as well as streams that are generated on a ConcurrentHashMap, allows safe iteration over its contents. When using the ConcurrentMap’s iterator, you do not have to worry about locking the contents of the ConcurrentMap while iterating over it (and it doesn’t throw ConcurrentModificationExceptions).

For a complete list of the newly added methods, refer to the online documentation at http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html.

Image Note ConcurrentMap iterators, while thread-safe, don’t guarantee that you will see entries added/updated after the iterator was created.

10-3. Inserting a Key into a Map Only if the Key Is not Already Present

Problem

A Map within your application is continuously being updated, and you need to put a key/value pair into it if the key does not already exist. Therefore, you need to check for the key’s presence, and you need assurance that some other thread doesn’t insert the same key in the meantime.

Solution

Using the ConcurrentMap.putIfAbsent() method, you can determine whether the map was modified atomically. For example, the following code uses the method to check and insert in a single step, thus avoiding the concurrency problem:

private void start() {
ConcurrentMap<Integer, String> concurrentMap = new ConcurrentHashMap<>();
for (int i = 0; i < 100; i++) {
startUpdateThread(i, concurrentMap);
}

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

concurrentMap.entrySet().stream().forEach((entry) -> {
System.out.println("Key :" + entry.getKey() + " Value:" + entry.getValue());
});

}

Random random = new Random();

private void startUpdateThread(final int i, final ConcurrentMap<Integer, String> concurrentMap) {
Thread thread = new Thread(() -> {
int randomInt = random.nextInt(20);
String previousEntry = concurrentMap.putIfAbsent(randomInt, "Thread # " + i + " has made it!");
if (previousEntry != null) {
System.out.println("Thread # " + i + " tried to update it but guess what,
we're too late!");
} else {
System.out.println("Thread # " + i + " has made it!");
}
});
thread.start();
}

When running the program, some of the entries will be successfully inserted, while others will not because the key has already been inserted by another thread.

How It Works

Updating a Map concurrently is difficult because it involves two operations: a check-then-act type of operation. First, the Map has to be checked to see whether an entry already exists in it. If the entry doesn’t exist, you can put the key and the value into the Map. On the other hand, if the key exists, the value for the key is retrieved. To do so, we use the ConcurrentMap’s putIfAbsent atomic operation. This ensures that either the key was present so the value is not overwritten, or the key was not present and so the value is set. For the JDK implementations ofConcurrentMap, the putIfAbsent() method will return null if there was no value for the key or return the current value if the key has a value. By asserting that the putIfAbsent() method returns null, you are assured that the operation was successful and that a new entry in the map has been created.

There are cases when putIfAbsent() might not be efficient to execute. For example, if the result is a large database query, executing the database query all the time and then invoking putIfAbsent() will not be efficient. In this kind of scenario, you could first call the map’scontainsKey() method to ensure that the key is not present. If it’s not present, then call the putIfAbsent() with the expensive database query. There might be a chance that the putIfAbsent() didn’t put the entry, but this type of check reduces the number of potentially expensive value creation.

See the following code snippet:

keyPresent = concurrentMap.containsKey(randomInt);
if (!keyPresent) {
concurrentMap.putIfAbsent(randomInt, "Thread # " + i + " has made it!");
}

In this code, the first operation is to check whether the key is already in the map. If it is, it doesn’t execute the putIfAbsent() operation. If the key is not present, we can proceed to execute the putIfAbsent() operation.

If you are accessing the values of the map from different threads, you should make sure that the values are thread-safe. This is most evident when using collections as values because they then could be accessed from different threads. Ensuring that the main map is thread-safe will prevent concurrent modifications to the map. However, once you gain access to the values of the map, you must exercise good concurrency practices around the values of the map.

Image Note ConcurrentMaps do not allow null keys, which is different from its non–thread safe cousin HashMap (which does allow null keys).

10-4. Iterating Through a Changing Collection

Problem

You need to iterate over each element in a collection. However, other threads are constantly updating the collection.

Solution 1

By using CopyOnWriteArrayList, you can safely iterate through the collection without worrying about concurrency. In the following solution, the startUpdatingThread() method creates a new thread, which actively changes the list passed to it. WhilestartUpdatingThread() modifies the list, it is concurrently iterated using the stream forEach() function.

private void copyOnWriteSolution() {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>();
startUpdatingThread(list);
list.stream().forEach((element) -> {
System.out.println("Element :" + element);
});
stopUpdatingThread();

}

Solution 2

Using a synchronizedList() allows us to atomically change the collection. Also, a synchronizedList() provides a way to synchronize safely on the list while iterating through it (which is done in the stream). For example:

private void synchronizedListSolution() {
final List<String> list = Collections.synchronizedList(new ArrayList<String>());
startUpdatingThread(list);
synchronized (list) {
list.stream().forEach((element) -> {
System.out.println("Element :" + element);
});
}
stopUpdatingThread();
}

How It Works

Java comes with many concurrent collection options. Which collection to use depends on how the read operations compare with the write operations within the context of your application. If writing occurs far and in-between compared with reads, using a copyOnWriteArrayListinstance is most efficient because it doesn’t block (stop) other threads from reading the list and is thread-safe to iterate over (no ConcurrentModificationException is thrown when iterating through it). If there are the same number of writes and reads, using aSynchronizedList is the preferred choice.

In solution 1, the CopyOnWriteArrayList is being updated while you traverse the list. Because the recipe uses the CopyOnWriteArrayList instance, there is no need to worry of thread safety when iterating through the collection (as is being done in this recipe by using thestream). It is good to note that the CopyOnWriteArrayList offers a snapshot in time when iterating through it. If another thread modifies the list as you’re iterating through it, changes are that the modified list will not be visible when iterating.

Image Note Locking properly depends on the type of collection being used. Any collections returned as a result of using Collections.synchronized can be locked via the collection itself (synchronized (collectionInstance)). However, some more efficient (newer) concurrent collections such as the ConcurrentMap cannot be used in this fashion because their internal implementations don’t lock in the object itself.

Solution 2 creates a synchronized list, which is created by using the Collections helper class. The Collection.synchronizedList() method wraps a List object (it can be ArrayList, LinkedList, or another List implementer) into a list that synchronizes the access to the list operations. Each time that you need to iterate over a list (either by using the stream, a for loop, or an iterator) you must be aware of the concurrency implications for that list’s iterator. The CopyOnWriteArrayList is safe to iterate over (as specified in the Javadoc), but thesynchronizedList iterator must be synchronized manually (also specified in the Collections.synchronizedlist.list iterator Javadoc). In the solution, the list can safely be iterated while inside the synchronized(list) block. When synchronizing on the list, no read/updates/other iterations can occur until the synchronized(list) block is completed.

10-5. Coordinating Different Collections

Problem

You need to modify different but related collections at the same time and you want to ensure that no other threads can see these modifications until they have been completed.

Solution 1

By synchronizing on the principal collection, you can guarantee that collection can be updated at the same time. In the following example, the fulfillOrder method needs to check the inventory of the order to be fulfilled, and if there is enough inventory to fulfill the order, it needs to add the order to the customerOrders list. The fulfillOrder() method synchronizes on the inventoryMap map and modifies both the inventoryMap and the customerOrders list before finishing the synchronized block.

private boolean fulfillOrder(String itemOrdered, int quantityOrdered, String customerName) {
synchronized (inventoryMap) {
int currentInventory = inventoryMap.get(itemOrdered);
if (currentInventory < quantityOrdered) {
System.out.println("Couldn't fulfill order for "+customerName+" not enough
"+itemOrdered+" ("+quantityOrdered+")");
return false; // sorry, we sold out
}
inventoryMap.put(itemOrdered,currentInventory - quantityOrdered);
CustomerOrder order = new CustomerOrder(itemOrdered, quantityOrdered, customerName);
customerOrders.add(order);
System.out.println("Order fulfilled for "+customerName+" of "+itemOrdered+"
("+quantityOrdered+")");
return true;
}
}

private void checkInventoryLevels() {
synchronized (inventoryMap) {
System.out.println("------------------------------------");
inventoryMap.entrySet().stream().forEach((inventoryEntry) -> {
System.out.println("Inventory Level :"+inventoryEntry.getKey()+"
"+inventoryEntry.getValue());
});
System.out.println("------------------------------------");
}
}

private void displayOrders() {
synchronized (inventoryMap) {
customerOrders.stream().forEach((order) -> {
System.out.println(order.getQuantityOrdered()+" "+order.getItemOrdered()+" for
"+order.getCustomerName());
});
}
}

Solution 2

Using a reentrant lock, you can prevent multiple threads from accessing the same critical area of the code. In this solution, the inventoryLock is acquired by calling inventoryLock.lock(). Any other thread that tries to acquire the inventoryLock lock will have to wait until theinventoryLock lock is released. At the end of the fulfillOrder() method (in the finally block), the inventoryLock is released by calling the inventoryLock.unlock() method:

Lock inventoryLock = new ReentrantLock();
private boolean fulfillOrder(String itemOrdered, int quantityOrdered, String customerName) {
try {
inventoryLock.lock();
int currentInventory = inventoryMap.get(itemOrdered);
if (currentInventory < quantityOrdered) {
System.out.println("Couldn't fulfill order for " + customerName +
" not enough " + itemOrdered + " (" + quantityOrdered + ")");
return false; // sorry, we sold out
}
inventoryMap.put(itemOrdered, currentInventory - quantityOrdered);
CustomerOrder order = new CustomerOrder(itemOrdered, quantityOrdered, customerName);
customerOrders.add(order);
System.out.println("Order fulfilled for " + customerName + " of " +
itemOrdered + " (" + quantityOrdered + ")");
return true;
} finally {
inventoryLock.unlock();
}
}

private void checkInventoryLevels() {
try {
inventoryLock.lock();
System.out.println("------------------------------------");
inventoryMap.entrySet().stream().forEach((inventoryEntry) -> {
System.out.println("Inventory Level :" + inventoryEntry.getKey() + " " + inventoryEntry.getValue());
});
System.out.println("------------------------------------");
} finally {
inventoryLock.unlock();
}
}

private void displayOrders() {
try {
inventoryLock.lock();
customerOrders.stream().forEach((order) -> {
System.out.println(order.getQuantityOrdered() + " " +
order.getItemOrdered() + " for " + order.getCustomerName());
});
} finally {
inventoryLock.unlock();
}
}

How It Works

If you have different structures that are required to be modified at the same time, you need to make sure that these structures are updated atomically. An atomic operation refers to a set of instructions that can be executed as a whole or none at all. An atomic operation is visible to the rest of the program only when it is complete.

In solution 1 (atomically modifying both the inventoryMap map and the customerOrders list), you pick a “principal” collection on which you will lock (the inventoryMap). By locking on the principal collection, you guarantee that if another thread tries to lock on the same principal collection, it will have to wait until the current executing thread releases the lock on the collection.

Image Note Notice that even though displayOrders doesn’t use the inventoryMap, you still synchronize on it (in solution 1). Because the inventoryMap is the main collection, even operations done on secondary collections will still need to be protected by the main collection synchronization.

Solution 2 is more explicit, offering an independent lock that is used to coordinate the atomic operations instead of picking a principal collection. Locking refers to the ability of the JVM to restrict certain code paths to be executed by only one thread. Threads try to obtain the lock (locks are provided, for example, by a ReentrantLock instance, as shown in the example), and the lock can be given to only one thread at a time. If other threads were trying to acquire the same lock, they will be suspended (WAIT) until the lock becomes available. The lock becomes available when the thread that currently holds the lock releases it. When a lock is released, it can then be acquired by one (and only one) of the threads that were waiting for that lock.

Locks by default are not “fair.” In other words, the order of the threads that requested the lock is not kept; this allows for very fast locking/unlocking implementation in the JVM, and in most situations, it is generally okay to use unfair locks. On a very highly contended lock, if there is a requirement to evenly distribute the lock (make it fair), you do so by setting the setFair property on the lock.

In solution 2, calling the inventoryLock.lock() method, will either acquire the lock and continue, or will suspend execution (WAIT) until the lock can be acquired. Once the lock is acquired, no other thread will be able to execute within the locked block. At the end of the block, the lock is released by calling inventoryLock.unlock().

It is common practice when working with Lock objects (ReentrantLock, ReadLock, and WriteLock) to surround the use of these Lock objects by a try/finally clause. After opening the try block, the first instruction would be a call to the lock.lock() method. This guarantees that the first instruction executed is the acquisition of the lock. The release of the lock (by calling lock.unlock()) is done in the matching finally block. In the event of a RuntimeException occurring while you have acquired the lock, unlocking within the finallyclause assures that one doesn’t “keep” the lock and prevent other threads from acquiring it.

The use of the ReentrantLock object offers additional features that the synchronized statement doesn’t offer. As an example, the ReentrantLock has the tryLock() function, which attempts to get the lock only if no other threads have it (the method doesn’t make the invoking thread wait). If another thread holds the lock, the method returns false but continues executing. It is better to use the synchronized keyword for synchronization and use ReentrantLock only when its features are needed. For more information on the other methods provided by ReentrantLock, visit http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReentrantLock.html.

Image Tip While this is only a recipe book and proper threading techniques span their own volumes, it is important to raise awareness of deadlocks. Deadlocks occur when two locks are involved (and are acquired in reverse order within another thread). The simplest way to avoid a deadlock is to avoid letting the lock “escape.” This means that the lock, when acquired, should not execute code calling on other methods that could possibly acquire a different lock. If that’s not possible, release the lock before calling such a method.

Care should be taken in that any operation that refers to one or both collections needs to be protected by the same lock. Operations that depend on the result of one collection to query the second collection need to be executed atomically; they need to be done as a unit in which neither collection can change until the operation is completed.

10-6. Splitting Work into Separate Threads

Problem

You have work that can be split into separate threads and want to maximize the use of available CPU resources.

Solution

Use a ThreadpoolExecutor instance, which allows us to break the tasks into discrete units. In the following example, a BlockingQueue is created, which includes a Runnable object. It then is passed to the ThreadPoolExecutor instance. The ThreadPoolExecutor is then initialized and started by calling the prestartAllCoreThreads() method, and then you wait until all the Runnable objects are done executing by calling the shutdown() method, followed by the awaitTermination() method:

private void start() throws InterruptedException {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
for (int i =0;i < 10;i++) {
final int localI = i;
queue.add((Runnable) () -> {
doExpensiveOperation(localI);
});
}
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,1000,
TimeUnit.MILLISECONDS, queue);
executor.prestartAllCoreThreads();
executor.shutdown();
executor.awaitTermination(100000,TimeUnit.SECONDS);

System.out.println("Look ma! all operations were completed");
}

How It Works

A ThreadPoolExecutor consists of two components: the Queue of tasks to be executed, and the Executor, which tells how to execute the tasks. The Queue is filled with Runnable objects, on which the method run() contains the code to be executed.

The Queue used by a ThreadPoolExecutor is an implementer of the BlockingQueue interface. The BlockingQueue interface denotes a queue in which the consumers of the queue will wait (be suspended) if there are no elements within the Queue. This is necessary for theThreadPoolExecutor to work efficiently.

The first step is to fill the Queue with the tasks that need to be executed in parallel. This is done by calling the Queue’s add() method and passing to it a class that implements the Runnable interface. Once that’s done, the executor is initialized.

The ThreadPoolExecutor constructor has many parameter options; the one used in the solution is the simplest. Table 10-1 has a description of each parameter.

Table 10-1. ThreadPoolExecutor’s Parameters

Parameter

Description

CorePoolSize

The minimum number of threads that are created as tasks are submitted

MaximumPoolSize

The maximum number of threads that the Executor would create

KeepAliveTime

The time that the waiting threads will wait for work before being disposed (as long as the number of live threads is still more than the CorePoolSize)

TimeUnit

The unit on which the KeepAliveTime is expressed (that is, TimeUnit.SECONDS, TimeUnit.MILLISECONDS)

WorkQueue

The Blocking queue that contains the tasks to be processed by the Executor

After the ThreadPoolExecutor is initialized, you call the prestartAllCoreThreads(). This method “warms up” the ThreadPoolExecutor by creating the number of threads specified in the CorePoolSize and actively starts consuming tasks from the Queue if it is not empty.

Call the shutdown() method of the ThreadPoolExecutor to wait for all the tasks to be completed. By calling this method, the ThreadPoolExecutor is instructed to accept no new events from the queue (previously submitted events will finish processing). This is the first step in the orderly termination of a ThreadPoolExecutor. Call the awaitTermination() method to wait for all the tasks in the ThreadPoolExecutor to be done. This method will force the main thread to wait until all the Runnables in the ThreadPoolExecutor's queue have completed executing. After all the Runnables have executed, the main thread will wake up and continue.

Image Note A ThreadPoolExecutor needs to be configured correctly to maximize CPU usage. The most efficient number of threads for an executor depends on the types of tasks that are submitted. If the tasks are CPU-intensive, having an executor with the current number of cores would be ideal. If the tasks are I/O-intensive, the executor should have more threads than the current number of cores of threads. The more I/O-bound, the higher the number of threads.

10-7. Coordinating Threads

Problem

Your application requires that two or more threads be coordinated to work in unison.

Solution 1

With wait/notify for thread synchronization, threads can be coordinated. In this solution, the main thread waits for the objectToSync object until the database-loading thread is finished executing. Once the database-loading thread is finished, it notifies the objectToSync that whomever is waiting on it can continue executing. The same process occurs when loading the orders into our system. The main thread waits on objectToSync until the orders-loading thread notifies objectToSync to continue by calling the objectToSync.notify() method. After ensuring that both the inventory and the orders are loaded, the main thread executes the processOrder() method to process all orders.

private final Object objectToSync = new Object();

private void start() {
loadItems();

Thread inventoryThread = new Thread(() -> {
System.out.println("Loading Inventory from Database...");
loadInventory();
synchronized (objectToSync) {
objectToSync.notify();
}
});

synchronized (objectToSync) {
inventoryThread.start();
try {
objectToSync.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

Thread ordersThread = new Thread(() -> {

System.out.println("Loading Orders from XML Web service...");
loadOrders();
synchronized (objectToSync) {
objectToSync.notify();

}
});

synchronized (objectToSync) {
ordersThread.start();
try {
objectToSync.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
processOrders();
}

Solution 2

You can control when the main thread continues using a CountDownLatch object. In the following code, a CountDownLatch with an initial value of 2 is created; then the two threads for loading the inventory and loading the order information are created and started. As each of the two threads finish executing, they call the CountDownLatch’s countDown() method, which decrements the latch’s value by one. The main thread waits until the CountDownLatch reaches 0, at which point it resumes execution.

CountDownLatch latch = new CountDownLatch(2);

private void start() {
loadItems();

Thread inventoryThread = new Thread(() -> {
System.out.println("Loading Inventory from Database...");
loadInventory();
latch.countDown();
});

inventoryThread.start();

Thread ordersThread = new Thread(() -> {
System.out.println("Loading Orders from XML Web service...");
loadOrders();
latch.countDown();
});

ordersThread.start();

try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

processOrders();

}

Solution 3

By using Thread.join(), you can wait for a thread to finish executing. The following example has a thread for loading the inventory and another thread for loading the orders. Once each thread is started, a call to inventoryThread.join() will make the main thread wait for theinventoryThread to finish executing before continuing.

private void start() {
loadItems();

Thread inventoryThread = new Thread(() -> {
System.out.println("Loading Inventory from Database...");
loadInventory();
});

inventoryThread.start();
try {
inventoryThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

Thread ordersThread = new Thread(() -> {
System.out.println("Loading Orders from XML Web service...");
loadOrders();
});

ordersThread.start();
try {
ordersThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
processOrders();
}

How It Works

There are many ways of coordinating threads in Java, and these coordination efforts rely on the notion of making a thread wait. When a thread waits, it suspends execution (it doesn’t continue to the next instruction and is removed from the JVM’s thread scheduler). If a thread is waiting, it can then be awakened by notifying it. Within the Java’s concurrency lingo, the word notify implies that a thread will resume execution (the JVM will add the thread to the thread scheduler). So in the natural course of thread coordination, the most common sequence of events is a main thread waiting, and a secondary thread then notifying the main thread to continue (or wake up). Even so, there is the possibility of a waiting thread being interrupted by some other event. When a thread is interrupted, it doesn’t continue to the next instruction, but instead throws anInterruptedException, which is a way of signaling that even though the thread was waiting for something to happen, some other event happened that requires the thread’s attention. This is better illustrated by the following example:

BlockingQueue queue = new LinkedBlockingQueue();
while (true) {
synchronized (this) {
Object itemToProcess = queue.take();
processItem (itemToProcess);
}
}

If you look at the previous code, the thread that executes this code would never terminate because it loops forever and waits for an item to be processed. If there are no items in the Queue, the main thread waits until there is something added to the Queue from another thread. You couldn’t graciously shut down the previous code (especially if the thread running the loop is not a Daemon thread).

BlockingQueue queue = new LinkedBlockingQueue();
while (true) {
synchronized (this) {
Object itemToProcess = null;
try {
itemToProcess = queue.take();
} catch (InterruptedException e) {
return;
}
processItem (itemToProcess);
}
}

The new code now has the ability of “escaping” the infinite loop. From another thread, you can call thread.interrupt(), which throws the InterruptedException that is then caught by the main thread’s catch clause. The infinite loop can be exited within this clause.

InterruptedExceptions are a way of sending extra information to waiting (or sleeping) threads so that they may handle a different scenario (for example, an orderly program shutdown). For this reason, every operation that changes the state of the thread to sleep/wait will have to be surrounded by a try/catch block that can catch the InterruptedException. This is one of the cases in which the exception (InterruptedException) is not really an error but more of a way of signaling between threads that something has occurred that requires attention.

Solution 1 demonstrates the most common (oldest) form of coordination. The solution requires making a thread wait and suspending execution until the thread gets notified (or awakened) by another thread.

For solution 1 to work, the originating thread needs to acquire a lock. This lock will then be the “phone number” on which another thread can notify the originating thread to wake up. After the originating thread acquires the lock (phone number), it proceeds to wait. As soon as thewait() method is called, the lock is released, allowing other threads to acquire the same lock. The secondary thread then proceeds to acquire the lock (the phone number) and then notifies (which, in fact, would be like dialing a wake-up call) the originating thread. After the notification, the originating thread resumes execution.

In the solution 1 code, the lock is a dummy object identified as objectToSync. In practice, the object on which locks are waiting and notifying could be any valid instance object in Java; for example, we could have used the this reference to make the main thread wait (and within the threads we could have used the Recipe 10_7_1.this variable reference to notify the main thread to continue).

The main advantage of using this technique is the explicitness of controlling on whom to wait and when to notify (and the ability to notify all threads that are waiting on the same object; see the following tip).

Image Tip Multiple threads can wait on the same lock (same phone number to be awakened). When a secondary thread calls notify, it will wake up one of the “waiting” threads (there is no fairness about which is awakened). Sometimes you will need to notify all the threads; you can call thenotifyAll() method instead of calling the notify() method. This is mostly used when preparing many threads to take some work, but the work is not yet finished setting up.

Solution 2 uses a more modern approach to notification, as it involves a CountDownLatch. When setting up, specify how many “counts” the latch will have. The main thread will then wait (stop execution) by calling the CountDownLatch’s await() method until the latch counts down to 0. When the latch reaches 0, the main thread will wake up and continue execution. As the worker thread completes, call the latch.countdown() method, which will decrement the latch’s current count value. If the latch’s current value reaches 0, the main thread that was waiting on the CountDownLatch will wake up and continue execution.

The main advantage of using CountDownLatches is that it is possible to spawn multiple tasks at the same time and just wait for all of them to complete. (In the solution example, we didn’t need to wait until one or the other thread were completed before continuing; they all were started, and when the latch was 0, the main thread continued.)

Solution 3 instead offers a solution in which we have access to the thread we want to wait on. For the main thread, it’s just a matter of calling the secondary thread’s join() method. Then the main thread will wait (stop executing) until the secondary thread completes.

The advantage of this method is that it doesn’t require the secondary threads to know any synchronization mechanism. As long as the secondary thread terminates execution, the main thread can wait on them.

10-8. Creating Thread-Safe Objects

Problem

You need to create an object that is thread-safe because it will be accessed from multiple threads.

Solution 1

Use synchronized getters and setters and protect critical regions that change state. In the following example, an object is created with getters and setters that are synchronized for each internal variable. The critical regions are protected by using the synchronized(this) lock:

class CustomerOrder {
private String itemOrdered;
private int quantityOrdered;
private String customerName;

public CustomerOrder() {

}

public double calculateOrderTotal (double price) {
synchronized (this) {
return getQuantityOrdered()*price;
}
}

public synchronized String getItemOrdered() {
return itemOrdered;
}

public synchronized int getQuantityOrdered() {
return quantityOrdered;
}

public synchronized String getCustomerName() {
return customerName;
}

public synchronized void setItemOrdered(String itemOrdered) {
this.itemOrdered = itemOrdered;
}

public synchronized void setQuantityOrdered(int quantityOrdered) {
this.quantityOrdered = quantityOrdered;
}

public synchronized void setCustomerName(String customerName) {
this.customerName = customerName;
}
}

Solution 2

Create an immutable object (an object that, once created, doesn’t change its internal state). In the following code, the internal variables of the object are declared final and are assigned at construction. By doing so, it is guaranteed that the object is immutable:

class ImmutableCustomerOrder {
final private String itemOrdered;
final private int quantityOrdered;
final private String customerName;

ImmutableCustomerOrder(String itemOrdered, int quantityOrdered, String customerName) {
this.itemOrdered = itemOrdered;
this.quantityOrdered = quantityOrdered;
this.customerName = customerName;
}

public String getItemOrdered() {
return itemOrdered;
}

public int getQuantityOrdered() {
return quantityOrdered;
}

public String getCustomerName() {
return customerName;
}

public synchronized double calculateOrderTotal (double price) {
return getQuantityOrdered()*price;
}
}

How It Works

Solution 1 relies on the principle that a lock protects any change done to the object. Using the synchronized keyword is a shortcut to writing the expression synchronized (this). By synchronizing your getters and setters (and any other operation that alters the internal state of your object), you guarantee that the object is consistent. Also, it is important that any operations that should occur as a unit (say something that modifies two collections at the same time, as listed in Recipe 10-5) are done within a method of the object and are protected by using thesynchronized keyword.

For instance, if an object offers a getSize() method as well as getItemNumber(int index), it would be unsafe to write the following object.getItemNumber (object.getSize()-1). Even though it looks that the statement is concise, another thread can alter the contents of the object between getting the size and getting the item number. Instead, it is safer to create a object.getLastElement() method, which atomically figures out the size and the last element.

Solution 2 relies on the property of immutable objects. Immutable objects cannot change their internal state, and objects that cannot change their internal state (are immutable) are by definition thread-safe. If you need to modify the immutable object due to an event, instead of explicitly changing its property, create a new object with the changed properties. This new object then takes the place of the old object, and on future requests for the object, the new immutable object is returned. This is by far the easiest (albeit verbose) method for creating thread-safe code.

10-9. Implementing Thread-Safe Counters

Problem

You need a counter that is thread-safe so that it can be incremented from within different execution threads.

Solution

By using the inherently thread-safe Atomic objects, it is possible to create a counter that guarantees thread safety and has an optimized synchronization strategy. In the following code, an Order object is created, and it requires a unique order ID that is generated using the AtomicLong incrementAndGet() method:

AtomicLong orderIdGenerator = new AtomicLong(0);

for (int i =0;i < 10;i++) {
Thread orderCreationThread = new Thread(() -> {
for (int i1 = 0; i1 < 10; i1++) {
createOrder(Thread.currentThread().getName());
}
});
orderCreationThread.setName("Order Creation Thread "+i);
orderCreationThread.start();
}

//////////////////////////////////////////////////////
private void createOrder(String name) {
long orderId = orderIdGenerator.incrementAndGet();
Order order = new Order(name, orderId);
orders.add(order);
}

How It Works

AtomicLong (and its cousin AtomicInteger) are built to be used safely in concurrent environments. They have methods to atomically increment (and get) the changed value. Even if hundreds of threads call the AtomicLong increment() method, the returned value will always be unique.

If you need to make decisions and update the variables, always use the atomic operations that are offered by AtomicLong; for example, compareAndSet. If not, your code will not be thread-safe (as any check-then-act operation needs to be atomic) unless you externally protect the atomic reference by using your own locks (see Recipe 10-7).

The following code illustrates several code safety issues to be aware of. First, changing a long value may be done in two memory write operations (as allowed by the Java Memory Model), and thus two threads could end up overlapping those two operations in what might on the surface appear to be thread-safe code. The result would be a completely unexpected (and likely wrong) long value:

long counter = 0;

public long incrementCounter() {
return counter++;
}

This code also suffers from unsafe publication, which refers to the fact that a variable might be cached locally (in the CPU’s internal cache) and might not be committed to main memory. If another thread (executing in another CPU) happens to be reading the variable from main memory, that other thread may miss the changes made by the first thread. The changed value may be cached by the first thread’s CPU, and not yet committed to main memory where the second thread can see it. For safe publication, you must use the volatile Java modifier (seehttp://download.oracle.com/javase/tutorial/essential/concurrency/atomic.html).

A final issue with the preceding code is that it is not atomic. Even though it looks like there is only one operation to increment the counter, in reality there are two operations that occur at the machine-language level (a retrieve of the variable and then an increment). There could be two or more threads that obtain the same value as they both retrieve the variable but haven’t incremented it yet. Then all the threads increment the counter to the same number.

10-10. Breaking Down Tasks into Discrete Units of Work

Problem

You have an algorithm that benefits from using a divide-and-conquer strategy, which refers to the ability of breaking down a unit of work into two separate subunits and then piecing together the results from these subunits. The subunits can then be broken down into more subunits of work until reaching a point where the work is small enough to just be executed. By breaking down the unit of work into subunits, you can take advantage of the multicore nature of today’s processors with minimum pain.

Solution

The new Fork/Join framework makes applying the divide-and-conquer strategy straightforward. The following example creates a representation of the Game of Life. The code uses the Fork/Join framework to speed up the calculation for each iteration when advancing from one generation to the next:

////////////////////////////////////////////////////////////////

ForkJoinPool pool = new ForkJoinPool();
long i = 0;

while (shouldRun) {
i++;
final boolean[][] newBoard = new boolean[lifeBoard.length][lifeBoard[0].length];
long startTime = System.nanoTime();
GameOfLifeAdvancer advancer = new GameOfLifeAdvancer(lifeBoard, 0,0, lifeBoard.length-1,
lifeBoard[0].length-1,newBoard);
pool.invoke(advancer);
long endTime = System.nanoTime();
if (i % 100 == 0 ) {
System.out.println("Taking "+(endTime-startTime)/1000 + "ms");
}
SwingUtilities.invokeAndWait(() -> {
model.setBoard(newBoard);
lifeTable.repaint();
});
lifeBoard = newBoard;
}
////////////////////////////////////////////////////////////////

class GameOfLifeAdvancer extends RecursiveAction{

private boolean[][] originalBoard;
private boolean[][] destinationBoard;
private int startRow;
private int endRow;
private int endCol;
private int startCol;

GameOfLifeAdvancer(boolean[][] originalBoard, int startRow, int startCol, int endRow,
int endCol, boolean [][] destinationBoard) {
this.originalBoard = originalBoard;
this.destinationBoard = destinationBoard;
this.startRow = startRow;
this.endRow = endRow;
this.endCol = endCol;
this.startCol = startCol;
}

private void computeDirectly() {
for (int row = startRow; row <= endRow;row++) {
for (int col = startCol; col <= endCol; col++) {
int numberOfNeighbors = getNumberOfNeighbors (row, col);
if (originalBoard[row][col]) {
destinationBoard[row][col] = true;
if (numberOfNeighbors < 2) destinationBoard[row][col] = false;
if (numberOfNeighbors > 3) destinationBoard[row][col] = false;
} else {
destinationBoard[row][col] = false;
if (numberOfNeighbors == 3) destinationBoard[row][col] = true;
}
}
}
}

private int getNumberOfNeighbors(int row, int col) {
int neighborCount = 0;
for (int leftIndex = -1; leftIndex < 2; leftIndex++) {
for (int topIndex = -1; topIndex < 2; topIndex++) {
if ((leftIndex == 0) && (topIndex == 0)) continue; // skip own
int neighbourRowIndex = row + leftIndex;
int neighbourColIndex = col + topIndex;
if (neighbourRowIndex<0) neighbourRowIndex = originalBoard.length +
neighbourRowIndex;
if (neighbourColIndex<0) neighbourColIndex = originalBoard[0].length +
neighbourColIndex ;
boolean neighbour = originalBoard[neighbourRowIndex % originalBoard.length][neighbourColIndex % originalBoard[0].length];
if (neighbour) neighborCount++;
}
}
return neighborCount;
}

@Override
protected void compute() {
if (getArea() < 20) {
computeDirectly();
return;
}
int halfRows = (endRow - startRow) / 2;
int halfCols = (endCol - startCol) / 2;
if (halfRows > halfCols) {
// split the rows
invokeAll(new GameOfLifeAdvancer(originalBoard, startRow, startCol,
startRow+halfRows, endCol,destinationBoard),
new GameOfLifeAdvancer(originalBoard, startRow+halfRows+1, startCol,
endRow, endCol,destinationBoard));
} else {
invokeAll(new GameOfLifeAdvancer(originalBoard, startRow, startCol, endRow,
startCol+ halfCols,destinationBoard),
new GameOfLifeAdvancer(originalBoard, startRow, startCol+halfCols+1,
endRow, endCol,destinationBoard));
}
}

private int getArea() { return (endRow - startRow) * (endCol - startCol); }

}

How It Works

The Fork/Join framework can be used for breaking down tasks into discrete units of work. The first part of the solution creates a ForkJoinPool object. The default constructor provides reasonable defaults (such as creating as many threads as there are CPU cores) and sets up an entry point to submit divide-and-conquer work. While the ForkJoinPool inherits from ExecutorService, it is best suited to handle tasks that extend from RecursiveAction. The ForkJoinPool object has the invoke(RecursiveAction) method, which will take aRecursiveAction object and apply the divide-and-conquer strategy.

The second part of the solution creates the GameOfLifeAdvancer class, which extends the RecursiveAction class. By extending the RecursiveAction class, the work can be split. The GameOfLifeAdvancer class advances the Game of Life board to the next generation. The constructor takes a two-dimensional Boolean array (which represents a Game of Life board), a start row/column, an end row/column, and a destination two-dimensional Boolean array, on which the result of advancing the Game of Life for one generation is collected.

The GameOfLifeAdvancer is required to implement the compute() method. In this method, determine how much work there is to be completed. If the work is small enough, the work is completed directly (achieved by calling the computeDirectly() method and returning). If the work is not small enough, the method splits the work by creating two GameOfLifeAdvancer instances that process only half of the current GameOfLifeAdvancer work. This is done by either splitting the number of rows to be processed into two chunks or by splitting the number of columns into two chunks. The two GameOfLifeAdvancer instances are then passed to the ForkJoin pool by calling the invokeAll() method of the RecursiveAction class. The invokeAll() method takes the two instances of GameOfLifeAdvancer (it can take as many as needed) and waits until they both are finished executing (that is, the meaning of the –all postfix in the invokeAll() method name; it waits for all of the tasks submitted to be completed before returning control).

In this way, the GameOfLifeAdvancer instance is broken down into new GameOfLifeAdvancer instances that each processes only part of the Game of Life board. Each instance waits for all the subordinate parts to be completed before returning control to the caller. The resulting division of work can take advantage of the multiple CPUs available in the typical system today.

Image Tip The ForkJoinPool is generally more efficient than an ExecutorService because it implements a work-stealing policy. Each thread has a Queue of work to complete; if the Queue of any thread is empty, the thread will “steal” work from another thread queue, making a more efficient use of CPU processing power.

10-11. Updating a Common Value Across Multiple Threads

Problem

Your application needs to safely maintain a single summed value across multiple threads.

Solution

Utilize a DoubleAdder or LongAdder to contain the value that is being summed across multiple threads in order to ensure safe handling. In the following example, two threads are adding values to a DoubleAdder at the same time, and in the end the value is summed and displayed.

DoubleAdder da = new DoubleAdder();

private void start() {

Thread thread1 = new Thread(() -> {
for (int i1 = 0; i1 < 10; i1++) {
da.add(i1);
System.out.println("Adding " + i1);
}
});

Thread thread2 = new Thread(() -> {
for (int i1 = 0; i1 < 10; i1++) {
da.add(i1);
System.out.println("Adding " + i1);
}
});

thread1.start();
thread2.start();

try {
System.out.println("Sleep while summing....");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("The sum is: " + da.doubleValue());

}
}

Results:

Adding 0
Adding 1
Adding 2
Adding 3
Adding 4
Adding 5
Adding 6
Adding 7
Adding 0
Adding 8
Adding 9
Adding 1
Adding 2
Adding 3
Adding 4
Adding 5
Adding 6
Adding 7
Adding 8
Adding 9
The sum is: 90.0

How It Works

Prior to the release of Java 8, it was important to utilize atomic numbers when working with values across multiple threads. Atomic variables prevent thread interference without causing obstruction in the way that synchronized access may cause in some cases. Java 8 introduced a new line of atomic variables that provide for faster throughput than standard atomic variables. The java.util.concurrent.atomic.DoubleAdder and java.util.concurrent.atomic.LongAdder classes are preferable to AtomicDouble and AtomicLong in most cases when the values may be accessed and updated across multiple threads. Both DoubleAdder and LongAdder extend Number, and they are useful when summing values across threads, especially under high contention.

In the solution, a DoubleAdder is used to sum numbers across two different threads. Using the add() method, various numbers are “added” to the DoubleAdder value. After the threads have had ample time to perform their work, the doubleValue() method is called upon to return the sum of all values as a double.

Both the DoubleAdder and LongAdder classes contain similar methods, although the LongAdder does contain a couple of additional helper methods for incrementing and decrementing the value of the adder. Table 10-2 shows the methods that are contained within each of the classes.

Table 10-2. DoubleAdder and LongAdder Methods

Method

Description

add()

Adds the given value.

decrement()

(LongAdder only.) Equivalent to add(-1).

doubleValue()

Returns the sum() as a double value (after performing widening primitive conversion on LongAdder).

floatValue()

Returns the sum() as a float value after performing a widening primitive conversion.

increment()

(LongAdder only.) Equivalent to add(1).

intValue()

Returns the sum() as an int value after performing a narrowing conversion.

longValue()

Returns the sum() as a long value (after performing narrowing conversion on DoubleAdder).

reset()

Resets the variable’s values to zero.

sum()

Returns the current summed value.

sumThenReset()

Returns the current summed value and then resets the variable’s values to zero.

toString()

Returns the String representation of the summed value.

Image Tip In the same family as DoubleAdder and LongAdder are the DoubleAccumulator and LongAccumulator classes. These classes allow one or more variables that are being maintained across threads to be updated using a supplied function. Both of these classes accept anaccumulator function as the first argument and an identity as the second argument. When updates are applied across the thread, the set of variables used to perform the calculations may grow dynamically to reduce contention. For more information regarding these classes, which are new to Java 8, refer to the online documentation: http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html.

Summary

It is important to understand the fundamentals of concurrency when developing applications. There is nothing worse than testing an application successfully, and then having it fail with a deadlock once it is released into production. This chapter started with the basics, demonstrating how to spawn a background task. It then went on to cover various techniques for handing concurrency, from creating threads to using the Fork/Join framework to divide work into discrete tasks.