Parallel data processing and performance - Functional-style data processing - Java 8 in Action: Lambdas, streams, and functional-style programming (2015)

Java 8 in Action: Lambdas, streams, and functional-style programming (2015)

Part 2. Functional-style data processing

Chapter 7. Parallel data processing and performance

This chapter covers

· Processing data in parallel with parallel streams

· Performance analysis of parallel streams

· The fork/join framework

· Splitting a stream of data using a Spliterator

In the last three chapters, you’ve seen how the new Stream interface lets you manipulate collections of data in a declarative way. We also explained that the shift from external to internal iteration enables the native Java library to gain control over processing the elements of a stream. This approach relieves Java developers from explicitly implementing optimizations necessary to speed up the processing of collections of data. By far the most important benefit is the possibility of executing a pipeline of operations on these collections that automatically makes use of the multiple cores on your computer.

For instance, before Java 7, processing a collection of data in parallel was extremely cumbersome. First, you needed to explicitly split the data structure containing your data into subparts. Second, you needed to assign each of these subparts to a different thread. Third, you needed to synchronize them opportunely to avoid unwanted race conditions, wait for the completion of all threads, and finally combine the partial results. Java 7 introduced a framework called fork/join to perform these operations more consistently and in a less error-prone way. We explore this framework in section 7.2.

In this chapter, you’ll discover how the Stream interface gives you the opportunity to execute operations in parallel on a collection of data without much effort. It lets you declaratively turn a sequential stream into a parallel one. Moreover, you’ll see how Java can make this magic happen or, more practically, how parallel streams work under the hood by employing the fork/join framework introduced in Java 7. You’ll also discover that it’s important to know how parallel streams work internally, because if you ignore this aspect, you could obtain unexpected (and very likely wrong) results by misusing them.

In particular we’ll demonstrate that the way a parallel stream gets divided into chunks, before processing the different chunks in parallel, can in some cases be the origin of these incorrect and apparently unexplainable results. For this reason, you’ll learn how to take control of this splitting process by implementing and using your own Spliterator.

7.1. Parallel streams

In chapter 4, we briefly mentioned that the Stream interface allows you to process its elements in parallel in a very convenient way: it’s possible to turn a collection into a parallel stream by invoking the method parallelStream on the collection source. A parallel stream is a stream that splits its elements into multiple chunks, processing each chunk with a different thread. Thus, you can automatically partition the workload of a given operation on all the cores of your multicore processor and keep all of them equally busy. Let’s experiment with this idea by using a simple example.

Let’s suppose you need to write a method accepting a number n as argument and returning the sum of all the numbers from 1 to the given argument. A straightforward (perhaps naïve) approach is to generate an infinite stream of numbers, limiting it to the passed number, and then reduce the resulting stream with a BinaryOperator that just sums two numbers, as follows:

In more traditional Java terms, this code is equivalent to its iterative counterpart:

public static long iterativeSum(long n) {

long result = 0;

for (long i = 1L; i <= n; i++) {

result += i;

}

return result;

}

This operation seems to be a good candidate to leverage parallelization, especially for large values of n. But where do you start? Do you synchronize on the result variable? How many threads do you use? Who does the generation of numbers? Who adds them up?

Don’t worry about all of this. It’s a much simpler problem to solve if you adopt parallel streams!

7.1.1. Turning a sequential stream into a parallel one

You can make the former functional reduction process (that is, summing) run in parallel by turning the stream into a parallel one; call the method parallel on the sequential stream:

In the previous code, the reduction process used to sum all the numbers in the stream works in a way that’s similar to what’s described in section 5.4.1. The difference is that the Stream is internally divided into multiple chunks. As a result, the reduction operation can work on the various chunks independently and in parallel, as shown in figure 7.1. Finally, the same reduction operation combines the values resulting from the partial reductions of each substream, producing the result of the reduction process on the whole initial stream.

Figure 7.1. A parallel reduction operation

Note that, in reality, calling the method parallel on a sequential stream doesn’t imply any concrete transformation on the stream itself. Internally, a boolean flag is set to signal that you want to run in parallel all the operations that follow the invocation to parallel. Similarly, you can turn a parallel stream into a sequential one by just invoking the method sequential on it. Note that you might think that you could achieve finer-grained control over which operations you want to perform in parallel and which one sequentially while traversing the stream by combining these two methods. For example, you could do something like the following:

stream.parallel()

.filter(...)

.sequential()

.map(...)

.parallel()

.reduce();

But the last call to parallel or sequential wins and affects the pipeline globally. In this example, the pipeline will be executed in parallel because that’s the last call in the pipeline.

Configuring the thread pool used by parallel streams

Looking at the stream’s parallel method, you may wonder where the threads used by the parallel stream come from, how many there are, and how you can customize the process.

Parallel streams internally use the default ForkJoinPool (you’ll learn more about the fork/join framework in section 7.2), which by default has as many threads as you have processors, as returned by Runtime.getRuntime().availableProcessors().

But you can change the size of this pool using the system property java.util.concurrent.ForkJoinPool.common.parallelism, as in the following example:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");

This is a global setting, so it will affect all the parallel streams in your code. Conversely, it currently isn’t possible to specify this value for a single parallel stream. In general, having the size of the ForkJoinPool equal to the number of processors on your machine is a meaningful default, and we strongly suggest that you not modify it unless you have a very good reason for doing so.

Returning to the numbers-summing exercise, we said that you can expect a significant performance improvement in its parallel version when running it on a multicore processor. You now have three methods executing exactly the same operation in three different ways (iterative style, sequential reduction, and parallel reduction), so let’s see which is the fastest one!

7.1.2. Measuring stream performance

We claimed that the parallelized summing method should perform better than the sequential and the iterative methods. Nevertheless, in software engineering guessing is never a good idea! Especially when optimizing performance you should always follow three golden rules: measure, measure, measure. To this purpose you can develop a method very similar to the basic harness you used in section 6.6.2 to compare the performances of the two collectors partitioning numbers into prime and nonprime, as shown in the following listing.

Listing 7.1. Measuring performance of a function summing the first n numbers

public long measureSumPerf(Function<Long, Long> adder, long n) {

long fastest = Long.MAX_VALUE;

for (int i = 0; i < 10; i++) {

long start = System.nanoTime();

long sum = adder.apply(n);

long duration = (System.nanoTime() - start) / 1_000_000;

System.out.println("Result: " + sum);

if (duration < fastest) fastest = duration;

}

return fastest;

}

Here this method takes as arguments a function and a long. It applies the function 10 times on the long passed to the method, registers the time taken by each execution in milliseconds, and returns the duration of the fastest one. Supposing that you group all the methods you developed previously into a class named ParallelStreams, you can use this harness to check how long the sequential adder function takes to sum the first 10 million natural numbers:

System.out.println("Sequential sum done in: " +

measureSumPerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs");

Note that the results should be taken with a grain of salt. Many factors will influence the execution time, such as how many cores your machine supports! You can try this on your own machine by running the code available on the book’s repository. Executing it on a MacBook pro Intel i7 2.3 GHz quad-core, it prints the following:

Sequential sum done in: 97 msecs

You should expect that the iterative version using a traditional for loop runs much faster because it works at a much lower level and, more important, doesn’t need to perform any boxing or unboxing of the primitive values. If you try to measure its performance with

System.out.println("Iterative sum done in: " +

measureSumPerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs");

you’ll obtain

Iterative sum done in: 2 msecs

Now let’s do the same with the parallel version of that function

System.out.println("Parallel sum done in: " +

measureSumPerf(ParallelStreams::parallelSum, 10_000_000) + " msecs" );

and see what happens:

Parallel sum done in: 164 msecs

This is quite disappointing: the parallel version of the summing method is much slower than the sequential one. How can you explain this unexpected result? There are actually two issues mixed together:

· iterate generates boxed objects, which have to be unboxed to numbers before they can be added.

· iterate is difficult to divide into independent chunks to execute in parallel.

The second issue is particularly interesting because you need to keep a mental model that some stream operations are more parallelizable than others. Specifically, the iterate operation is hard to split into chunks that can be executed independently because the input of one function application always depends on the result of the previous application, as illustrated in figure 7.2.

Figure 7.2. iterate is inherently sequential.

This means that in this specific case the reduction process isn’t proceeding as depicted in figure 7.1: the whole list of numbers isn’t available at the beginning of the reduction process, making it impossible to efficiently partition the stream in chunks to be processed in parallel. By flagging the stream as parallel, you’re just adding to the sequential processing the overhead of allocating each sum operation on a different thread.

This demonstrates how parallel programming can be tricky and sometimes counterintuitive. When misused (for example, using an operation that’s not parallel-friendly, like iterate) it can actually worsen the overall performance of your programs, so it’s mandatory to understand what happens behind the scenes when you invoke that apparently magic parallel method.

Using more specialized methods

So how can you leverage your multicore processors and use the stream to perform a parallel sum in an effective way? We discussed a method called LongStream.rangeClosed in chapter 5. This method has two benefits compared to iterate:

· LongStream.rangeClosed works on primitive long numbers directly so there’s no boxing and unboxing overhead.

· LongStream.rangeClosed produces ranges of numbers, which can be easily split into independent chunks. For example, the range 1–20 can be split into 1–5, 6–10, 11–15, and 16–20.

Let’s first see how it performs on a sequential stream to see if the overhead associated with unboxing is relevant:

public static long rangedSum(long n) {

return LongStream.rangeClosed(1, n)

.reduce(0L, Long::sum);

}

This time the output is

Ranged sum done in: 17 msecs

The numeric stream is much faster than the earlier sequential version, generated with the iterate factory method, because the numeric stream avoids all the overhead caused by all the unnecessary autoboxing and unboxing operations performed by the nonspecialized stream. This is evidence that choosing the right data structures is often more important than parallelizing the algorithm that uses them. But what happens if you try to use a parallel stream in this new version that follows?

public static long parallelRangedSum(long n) {

return LongStream.rangeClosed(1, n)

.parallel()

.reduce(0L, Long::sum);

}

Now, passing this function to your test method

System.out.println("Parallel range sum done in: " +

measureSumPerf(ParallelStreams::parallelRangedSum, 10_000_000) +

" msecs");

you obtain

Parallel range sum done in: 1 msecs

Finally, you obtain a parallel reduction that’s faster than its sequential counterpart, because this time the reduction operation can actually be executed as shown in figure 7.1. This also demonstrates that using the right data structure and then making it work in parallel guarantees the best performance.

Nevertheless, keep in mind that parallelization doesn’t come for free. The parallelization process itself requires you to recursively partition the stream, assign the reduction operation of each substream to a different thread, and then combine the results of these operations in a single value. But moving data between multiple cores is also more expensive than you might expect, so it’s important that work to be done in parallel on another core takes longer than the time required to transfer the data from one core to another. In general, there are many cases where it isn’t possible or convenient to use parallelization. But before you use a parallel Stream to make your code faster, you have to be sure that you’re using it correctly; it’s not helpful to produce a result in less time if the result will be wrong. Let’s look at a common pitfall.

7.1.3. Using parallel streams correctly

The main cause of errors generated by misuse of parallel streams is the use of algorithms that mutate some shared state. Here’s another way to implement the sum of the first n natural numbers but by mutating a shared accumulator:

public static long sideEffectSum(long n) {

Accumulator accumulator = new Accumulator();

LongStream.rangeClosed(1, n).forEach(accumulator::add);

return accumulator.total;

}

public class Accumulator {

public long total = 0;

public void add(long value) { total += value; }

}

It’s quite common to write this sort of code, especially for developers who are familiar with imperative programming paradigms. This code closely resembles what you’re used to doing when iterating imperatively a list of numbers: you initialize an accumulator and traverse the elements in the list one by one, adding them on the accumulator.

What’s wrong with this code? Unfortunately, it’s irretrievably broken because it’s fundamentally sequential. You have a data race on every access of total. And if you try to fix that with synchronization, you’ll lose all your parallelism. To understand this, let’s try to turn the Stream into a parallel one:

public static long sideEffectParallelSum(long n) {

Accumulator accumulator = new Accumulator();

LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);

return accumulator.total;

}

Try to run this last method with the harness of listing 7.1, also printing the result of each execution:

System.out.println("SideEffect parallel sum done in: " +

measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + "

msecs" );

You could obtain something like the following:

Result: 5959989000692

Result: 7425264100768

Result: 6827235020033

Result: 7192970417739

Result: 6714157975331

Result: 7497810541907

Result: 6435348440385

Result: 6999349840672

Result: 7435914379978

Result: 7715125932481

SideEffect parallel sum done in: 49 msecs

This time the performance of your method isn’t important: the only relevant thing is that each execution returns a different result, all very distant from the correct value of 50000005000000. This is caused by the fact that multiple threads are concurrently accessing the accumulator and in particular executing total += value, which, despite its appearance, isn’t an atomic operation. The origin of the problem is that the method invoked inside the forEach block has the side effect of changing the mutable state of an object shared among multiple threads. It’s mandatory to avoid these kinds of situations if you want to use parallel Streams without incurring similar bad surprises.

Now you know that shared mutable state doesn’t play well with parallel streams and with parallel computations in general. We’ll come back to this idea of avoiding mutation in chapters 13 and 14 when discussing functional programming in more detail. For now, keep in mind that avoiding shared mutable state ensures that your parallel Stream will produce the right result. Next, we’ll look at some practical advice you can use to figure out when it’s appropriate to use parallel streams to gain performance.

7.1.4. Using parallel streams effectively

In general it’s impossible (and pointless) to try to give any quantitative hint on when to use a parallel stream because any suggestion like “use a parallel stream only if you have at least one thousand (or one million or whatever number you want) elements” could be correct for a specific operation running on a specific machine, but it could be completely wrong in an even marginally different context. Nonetheless, it’s at least possible to provide some qualitative advice that could be useful when deciding whether it makes sense to use a parallel stream in a certain situation:

· If in doubt, measure. Turning a sequential stream into a parallel one is trivial but not always the right thing to do. As we already demonstrated in this section, a parallel stream isn’t always faster than the corresponding sequential version. Moreover, parallel streams can sometimes work in a counterintuitive way, so the first and most important suggestion when choosing between sequential and parallel streams is to always check their performance with an appropriate benchmark.

· Watch out for boxing. Automatic boxing and unboxing operations can dramatically hurt performance. Java 8 includes primitive streams (IntStream, LongStream, and DoubleStream) to avoid such operations, so use them when possible.

· Some operations naturally perform worse on a parallel stream than on a sequential stream. In particular, operations such as limit and findFirst that rely on the order of the elements are expensive in a parallel stream. For example, findAny will perform better than findFirstbecause it isn’t constrained to operate in the encounter order. You can always turn an ordered stream into an unordered stream by invoking the method unordered on it. So, for instance, if you need N elements of your stream and you’re not necessarily interested in the first N ones, calling limit on an unordered parallel stream may execute more efficiently than on a stream with an encounter order (for example, when the source is a List).

· Consider the total computational cost of the pipeline of operations performed by the stream. With N being the number of elements to be processed and Q the approximate cost of processing one of these elements through the stream pipeline, the product of N*Q gives a rough qualitative estimation of this cost. A higher value for Q implies a better chance of good performance when using a parallel stream.

· For a small amount of data, choosing a parallel stream is almost never a winning decision. The advantages of processing in parallel only a few elements aren’t enough to compensate for the additional cost introduced by the parallelization process.

· Take into account how well the data structure underlying the stream decomposes. For instance, an ArrayList can be split much more efficiently than a LinkedList, because the first can be evenly divided without traversing it, as it’s necessary to do with the second. Also, the primitive streams created with the range factory method can be decomposed quickly. Finally, as you’ll learn in section 7.3, you can get full control of this decomposition process by implementing your own Spliterator.

· The characteristics of a stream, and how the intermediate operations through the pipeline modify them, can change the performance of the decomposition process. For example, a SIZED stream can be divided into two equal parts, and then each part can be processed in parallel more effectively, but a filter operation can throw away an unpredictable number of elements, making the size of the stream itself unknown.

· Consider whether a terminal operation has a cheap or expensive merge step (for example, the combiner method in a Collector). If this is expensive, then the cost caused by the combination of the partial results generated by each substream can outweigh the performance benefits of a parallel stream.

Table 7.1 gives a summary of the parallel-friendliness of certain stream sources in terms of their decomposability.

Table 7.1. Stream sources and decomposability

Source

Decomposability

ArrayList

Excellent

LinkedList

Poor

IntStream.range

Excellent

Stream.iterate

Poor

HashSet

Good

TreeSet

Good

Finally, we need to emphasize that the infrastructure used behind the scenes by parallel streams to execute operations in parallel is the fork/join framework introduced in Java 7. The parallel summing example proved that it’s vital to have a good understanding of the parallel stream internals in order to use them correctly, so we’ll investigate in detail the fork/join framework in the next section.

7.2. The fork/join framework

The fork/join framework was designed to recursively split a parallelizable task into smaller tasks and then combine the results of each subtask to produce the overall result. It’s an implementation of the ExecutorService interface, which distributes those subtasks to worker threads in a thread pool, called ForkJoinPool. Let’s start by exploring how to define a task and subtasks.

7.2.1. Working with RecursiveTask

To submit tasks to this pool, you have to create a subclass of RecursiveTask<R>, where R is the type of the result produced by the parallelized task (and each of its subtasks) or of RecursiveAction if the task returns no result (it could be updating other nonlocal structures, though). To define RecursiveTasks you need only implement its single abstract method, compute:

protected abstract R compute();

This method defines both the logic of splitting the task at hand into subtasks and the algorithm to produce the result of a single subtask when it’s no longer possible or convenient to further divide it. For this reason an implementation of this method often resembles the following pseudocode:

if (task is small enough or no longer divisible) {

compute task sequentially

} else {

split task in two subtasks

call this method recursively possibly further splitting each subtask

wait for the completion of all subtasks

combine the results of each subtask

}

In general there are no precise criteria for deciding whether a given task should be further divided or not, but there are various heuristics that you can follow to help you with this decision. We clarify them in more detail in section 7.2.1. The recursive task-splitting process is visually synthesized by figure 7.3.

Figure 7.3. The fork/join process

As you might have noticed, this is nothing more than the parallel version of the well-known divide-and-conquer algorithm. To demonstrate a practical example of how to use the fork/join framework and to build on our previous examples, let’s try to calculate the sum of a range of numbers (here represented by an array of numbers long[]) using this framework. As explained, you need to first provide an implementation for the RecursiveTask class, as shown by the ForkJoinSumCalculator in the following listing.

Listing 7.2. Executing a parallel sum using the fork/join framework

Writing a method performing a parallel sum of the first n natural numbers is now pretty straightforward. You just need to pass the desired array of numbers to the constructor of ForkJoinSumCalculator:

public static long forkJoinSum(long n) {

long[] numbers = LongStream.rangeClosed(1, n).toArray();

ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);

return new ForkJoinPool().invoke(task);

}

Here, you generate an array containing the first n natural numbers using a LongStream. Then you create a ForkJoinTask (the superclass of RecursiveTask), passing this array to the public constructor of the ForkJoinSumCalculator shown in listing 7.2. Finally, you create a newForkJoinPool and pass that task to its invoke method. The value returned by this last method is the result of the task defined by the Fork-JoinSumCalculator class when executed inside the ForkJoinPool.

Note that in a real-world application, it doesn’t make sense to use more than one ForkJoinPool. For this reason, what you typically should do is instantiate it only once and keep this instance in a static field, making it a singleton, so it could be conveniently reused by any part of your software. Here, to create it you’re using its default no-argument constructor, meaning that you want to allow the pool to use all the processors available to the JVM. More precisely, this constructor will use the value returned by Runtime.availableProcessors to determine the number of threads used by the pool. Note that the availableProcessors method, despite its name, in reality returns the number of available cores, including any virtual ones due to hyperthreading.

Running the ForkJoinSumCalculator

When you pass the ForkJoinSumCalculator task to the ForkJoinPool, this task is executed by a thread of the pool that in turn calls the compute method of the task. This method checks to see if the task is small enough to be performed sequentially; otherwise, it splits the array of numbers to be summed into two halves and assigns them to two new ForkJoinSumCalculators that are scheduled to be executed by the ForkJoinPool. As a result, this process can be recursively repeated, allowing the original task to be divided into smaller tasks, until the condition used to check if it’s no longer convenient or no longer possible to further split it is met (in this case, if the number of items to be summed is less than or equal to 10,000). At this point, the result of each subtask is computed sequentially, and the (implicit) binary tree of tasks created by the forking process is traversed back toward its root. The result of the task is then computed, combining the partial results of each subtask. This process is shown in figure 7.4.

Figure 7.4. The fork/join algorithm

Once again you can check the performance of the summing method explicitly using the fork/join framework with the harness developed at the beginning of this chapter:

System.out.println("ForkJoin sum done in: " + measureSumPerf(

ForkJoinSumCalculator::forkJoinSum, 10_000_000) + " msecs" );

In this case it produces the following output:

ForkJoin sum done in: 41 msecs

Here, the performance is worse than the version using the parallel stream, but only because you’re obliged to put the whole stream of numbers into a long[] before being allowed to use it in the ForkJoinSumCalculator task.

7.2.2. Best practices for using the fork/join framework

Even though the fork/join framework is relatively easy to use, unfortunately it’s also easy to misuse. Here are a few best practices to leverage it effectively:

· Invoking the join method on a task blocks the caller until the result produced by that task is ready. For this reason, it’s necessary to call it after the computation of both subtasks has been started. Otherwise, you’ll end up with a slower and more complex version of your original sequential algorithm because every subtask will have to wait for the other one to complete before starting.

· The invoke method of a ForkJoinPool shouldn’t be used from within a RecursiveTask. Instead, you should always call the methods compute or fork directly; only sequential code should use invoke to begin parallel computation.

· Calling the fork method on a subtask is the way to schedule it on the ForkJoinPool. It might seem natural to invoke it on both the left and right subtasks, but this is less efficient than just directly calling compute on one of them. Doing this allows you to reuse the same thread for one of the two subtasks and avoid the overhead caused by the unnecessary allocation of a further task on the pool.

· Debugging a parallel computation using the fork/join framework can be tricky. In particular, it’s ordinarily quite common to browse a stack trace in your favorite IDE to discover the cause of a problem, but this can’t work with a fork-join computation because the call to computeoccurs in a different thread than the conceptual caller, which is the code that called fork.

· As you’ve discovered with parallel streams, you should never take for granted that a computation using the fork/join framework on a multicore processor is faster than the sequential counterpart. We already said that a task should be decomposable into several independent subtasks in order to be parallelizable with a relevant performance gain. All of these subtasks should take longer to execute than forking a new task; one idiom is to put I/O into one subtask and computation into another, thereby overlapping computation with I/O. Moreover, you should consider other things when comparing the performance of the sequential and parallel versions of the same algorithm. Like any other Java code, the fork/join framework needs to be “warmed up,” or executed, a few times before being optimized by the JIT compiler. This is why it’s always important to run the program multiple times before to measure its performance, as we did in our harness. Also be aware that optimizations built into the compiler could unfairly give an advantage to the sequential version (for example, by performing dead code analysis—removing a computation that’s never used).

The fork/join splitting strategy deserves one last note: you must choose the criteria used to decide if a given subtask should be further split or is small enough to be evaluated sequentially. We give some hints about this in the next section.

7.2.3. Work stealing

In our ForkJoinSumCalculator example we decided to stop creating more subtasks when the array of numbers to be summed contained at most 10,000 items. This is an arbitrary choice, but in most cases it’s difficult to find a good heuristic, other than trying to optimize it by making several attempts with different inputs. In our test case, we started with an array of 10 million items, meaning that the ForkJoinSumCalculator will fork at least 1,000 subtasks. This might seem like a waste of resources because we ran it on a machine that has only four cores. In this specific case, that’s probably true because all tasks are CPU bound and are expected to take a similar amount of time.

But forking a quite large number of fine-grained tasks is in general a winning choice. This is because ideally you want to partition the workload of a parallelized task in such a way that each subtask takes exactly the same amount of time, keeping all the cores of your CPU equally busy. Unfortunately, especially in cases closer to real-world scenarios than the straightforward example we presented here, the time taken by each subtask can dramatically vary either due to the use of an inefficient partition strategy or because of unpredictable causes like slow access to the disk or the need to coordinate the execution with external services.

The fork/join framework works around this problem with a technique called work stealing. In practice, this means that the tasks are more or less evenly divided on all the threads in the ForkJoinPool. Each of these threads holds a doubly linked queue of the tasks assigned to it, and as soon as it completes a task it pulls another one from the head of the queue and starts executing it. For the reasons we listed previously, one thread might complete all the tasks assigned to it much faster than the others, which means its queue will become empty while the other threads are still pretty busy. In this case, instead of becoming idle, the thread randomly chooses a queue of a different thread and “steals” a task, taking it from the tail of the queue. This process continues until all the tasks are executed, and then all the queues become empty. That’s why having many smaller tasks, instead of only a few bigger ones, can help in better balancing the workload among the worker threads.

More generally, this work-stealing algorithm is used to redistribute and balance the tasks among the worker threads in the pool. Figure 7.5 shows how this process occurs. When a task in the queue of a worker is divided into two subtasks, one of the two subtasks is stolen by another idle worker. As described previously, this process can continue recursively until the condition used to define that a given subtask should be executed sequentially becomes true.

Figure 7.5. The work-stealing algorithm used by the fork/join framework

It should now be clear how a stream can use the fork/join framework to process its items in parallel, but there’s still one missing ingredient. In this section, we analyzed an example where you explicitly developed the logic to split an array of numbers into multiple tasks. Nevertheless, you didn’t have to do anything similar when you used the parallel streams at the beginning of this chapter, and this means that there must be an automatic mechanism splitting the stream for you. This new automatic mechanism is called the Spliterator, and we explore it in the next section.

7.3. Spliterator

The Spliterator is another new interface added to Java 8; its name stands for “splitable iterator.” Like Iterators, Spliterators are used to traverse the elements of a source, but they’re also designed to do this in parallel. Although you may not have to develop your ownSpliterator in practice, understanding how to do so will give you a wider understanding about how parallel streams work. Java 8 already provides a default Spliterator implementation for all the data structures included in its Collections Framework. Collections now implements the interface Spliterator, which provides a method spliterator. This interface defines several methods, as shown in the following listing.

Listing 7.3. The Spliterator interface

public interface Spliterator<T> {

boolean tryAdvance(Consumer<? super T> action);

Spliterator<T> trySplit();

long estimateSize();

int characteristics();

}

As usual, T is the type of the elements traversed by the Spliterator. The tryAdvance method behaves in a way similar to a normal Iterator in the sense that it’s used to sequentially consume the elements of the Spliterator one by one, returning true if there are still other elements to be traversed. But the trySplit method is more specific to the Spliterator interface because it’s used to partition off some of its elements to a second Spliterator (the one returned by the method), allowing the two to be processed in parallel. A Spliterator may also provide an estimation of the number of the elements remaining to be traversed via its estimateSize method, because even an inaccurate but quick-to-compute value can be useful to split the structure more or less evenly.

It’s important to understand how this splitting process is performed internally in order to take control of it when required. Therefore, we analyze it in more detail in the next section.

7.3.1. The splitting process

The algorithm that splits a Stream into multiple parts is a recursive process and proceeds as shown in figure 7.6. In the first step trySplit is invoked on the first Spliterator and generates a second one. Then in step 2 it’s called again on these two Spliterators, which results in a total of four. The framework keeps invoking the method trySplit on a Spliterator until it returns null to signal that the data structure that it’s processing is no longer divisible, as shown in step 3. Finally, this recursive splitting process terminates in step 4 when all Spliterators have returned null to a trySplit invocation.

Figure 7.6. The recursive splitting process

This splitting process can also be influenced by the characteristics of the Spliterator itself, which are declared via the characteristics method.

The Spliterator characteristics

The last abstract method declared by the Spliterator interface is characteristics, which returns an int encoding the set of characteristics of the Spliterator itself. The Spliterator clients can use these characteristics to better control and optimize its usage. Table 7.2summarizes them. (Unfortunately, although these conceptually overlap with characteristics of a collector, they’re coded differently.)

Table 7.2. Spliterator’s characteristics

Characteristic

Meaning

ORDERED

Elements have a defined order (for example, a List), so the Spliterator enforces this order when traversing and partitioning them.

DISTINCT

For each pair of traversed elements x and y, x.equals(y) returns false.

SORTED

The traversed elements follow a predefined sort order.

SIZED

This Spliterator has been created from a source with a known size (for example, a Set), so the value returned by estimatedSize() is precise.

NONNULL

It’s guaranteed that the traversed elements won’t be null.

IMMUTABLE

The source of this Spliterator can’t be modified. This implies that no elements can be added, removed, or modified during their traversal.

CONCURRENT

The source of this Spliterator may be safely concurrently modified by other threads without any synchronization.

SUBSIZED

Both this Spliterator and all further Spliterators resulting from its split are SIZED.

Now that you’ve seen what the Spliterator interface is and which methods it defines, you can try to develop your own implementation of a Spliterator.

7.3.2. Implementing your own Spliterator

Let’s look at a practical example of where you might need to implement your own Spliterator. We’ll develop a simple method that counts the number of words in a String. An iterative version of this method could be written as shown in the following listing.

Listing 7.4. An iterative word counter method

Let’s put this method to work on the first sentence of Dante’s Inferno:[1]

1 See http://en.wikipedia.org/wiki/Inferno_(Dante).

final String SENTENCE =

" Nel mezzo del cammin di nostra vita " +

"mi ritrovai in una selva oscura" +

" ché la dritta via era smarrita ";

System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");

Note that we added some additional random spaces in the sentence to demonstrate that the iterative implementation is working correctly even in the presence of multiple spaces between two words. As expected, this code prints out the following:

Found 19 words

Ideally you’d like to achieve the same result in a more functional style because this way you’ll be able, as shown previously, to parallelize this process using a parallel Stream without having to explicitly deal with threads and their synchronization.

Rewriting the WordCounter in functional style

First, you need to convert the String into a stream. Unfortunately, there are primitive streams only for int, long, and double, so you’ll have to use a Stream<Character>:

Stream<Character> stream = IntStream.range(0, SENTENCE.length())

.mapToObj(SENTENCE::charAt);

You can calculate the number of words by performing a reduction on this stream. While reducing the stream, you’ll have to carry a state consisting of two variables: an int counting the number of words found so far and a boolean to remember if the last-encountered Character was a space or not. Because Java doesn’t have tuples (a construct to represent an ordered list of heterogeneous elements without the need of a wrapper object), you’ll have to create a new class, WordCounter, which will encapsulate this state as shown in the following listing.

Listing 7.5. A class to count words while traversing a stream of Characters

In this listing, the accumulate method defines how to change the state of the WordCounter, or more precisely with which state to create a new WordCounter because it’s an immutable class. The method accumulate is called whenever a new Character of the Stream is traversed. In particular, as you did in the countWordsIteratively method in listing 7.4, the counter is incremented when a new nonspace is met and the last character encountered is a space. Figure 7.7 shows the state transitions of the WordCounter when a new Character is traversed by theaccumulate method.

Figure 7.7. The state transitions of the WordCounter when a new Character c is traversed

The second method, combine, is invoked to aggregate the partial results of two WordCounters operating on two different subparts of the stream of Characters, so it combines two WordCounters by summing their internal counters.

Now that you’ve encoded the logic of how to accumulate characters on a WordCounter and how to combine them in the WordCounter itself, writing a method that will reduce the stream of Characters is straightforward:

private int countWords(Stream<Character> stream) {

WordCounter wordCounter = stream.reduce(new WordCounter(0, true),

WordCounter::accumulate,

WordCounter::combine);

return wordCounter.getCounter();

}

Now you can try this method with the stream created from the String containing the first sentence of Dante’s Inferno:

Stream<Character> stream = IntStream.range(0, SENTENCE.length())

.mapToObj(SENTENCE::charAt);

System.out.println("Found " + countWords(stream) + " words");

You can check that its output corresponds with the one generated by the iterative version:

Found 19 words

So far, so good, but we said that one of the main reasons for implementing the WordCounter in functional terms was to be able to easily parallelize this operation, so let’s see how this works.

Making the WordCounter work in parallel

You could try to speed up the word-counting operation using a parallel stream, as follows:

System.out.println("Found " + countWords(stream.parallel()) + " words");

Unfortunately, this time the output is

Found 25 words

Evidently something has gone wrong, but what? The problem isn’t hard to discover. Because the original String is split at arbitrary positions, sometimes a word is divided in two and then counted twice. In general, this demonstrates that going from a sequential stream to a parallel one can lead to a wrong result if this result may be affected by the position where the stream is split.

How can you fix this issue? The solution consists of ensuring that the String isn’t split at a random position but only at the end of a word. To do this, you’ll have to implement a Spliterator of Character that splits a String only between two words, as shown in the following listing, and then creates the parallel stream from it.

Listing 7.6. The WordCounterSpliterator

This Spliterator is created from the String to be parsed and iterates over its Characters by holding the index of the one currently being traversed. Let’s quickly revisit the methods of the WordCounterSpliterator implementing the Spliterator interface:

· The tryAdvance method feeds the Consumer with the Character in the String at the current index position and increments this position. The Consumer passed as argument is an internal Java class forwarding the consumed Character to the set of functions that have to be applied to it while traversing the stream, which in this case is only a reducing function, namely, the accumulate method of the WordCounter class. The tryAdvance method returns true if the new cursor position is less than the total String length and there are furtherCharacters to be iterated.

· The trySplit method is the most important one in a Spliterator because it’s the one defining the logic used to split the data structure to be iterated. As you did in the compute method of the RecursiveTask implemented in listing 7.1 (on how to use the fork/join framework), the first thing you have to do here is set a limit under which you don’t want to perform further splits. Here, you use a very low limit of 10 Characters only to make sure that your program will perform some splits with the relatively short String you’re parsing, but in real-world applications you’ll have to use a higher limit, as you did in the fork/join example, to avoid creating too many tasks. If the number of remaining Characters to be traversed is under this limit, you return null to signal that no further split is necessary. Conversely, if you need to perform a split, you set the candidate split position to the half of the String chunk remaining to be parsed. But you don’t use this split position directly because you want to avoid splitting in the middle of a word, so you move forward until you find a blank Character. Once you find an opportune split position, you create a new Spliterator that will traverse the substring chunk going from the current position to the split one; you set the current position of this to the split one, because the part before it will be managed by the new Spliterator, and then you return it.

· The estimatedSize of elements still to be traversed is the difference between the total length of the String parsed by this Spliterator and the position currently iterated.

· Finally, the characteristic method signals to the framework that this Spliterator is ORDERED (the order is just the sequence of Characters in the String), SIZED (the value returned by the estimatedSize method is exact), SUBSIZED (the other Spliterators created by the trySplit method also have an exact size), NONNULL (there can be no null Characters in the String), and IMMUTABLE (no further Characters can be added while parsing the String because the String itself is an immutable class).

Putting the WordCounterSpliterator to work

You can now use a parallel stream with this new WordCounterSpliterator as follows:

Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);

Stream<Character> stream = StreamSupport.stream(spliterator, true);

The second boolean argument passed to the StreamSupport.stream factory method means that you want to create a parallel stream. Passing this parallel stream to the countWords method

System.out.println("Found " + countWords(stream) + " words");

produces the correct output, as expected:

Found 19 words

You’ve seen how a Spliterator can let you to gain control over the policy used to split a data structure. One last notable feature of Spliterators is the possibility of binding the source of the elements to be traversed at the point of first traversal, first split, or first query for estimated size, rather than at the time of its creation. When this happens, it’s called a late-binding Spliterator. We’ve dedicated appendix C to showing how you can develop a utility class capable of performing multiple operations on the same stream in parallel using this feature.

7.4. Summary

In this chapter, you’ve learned the following:

· Internal iteration allows you to process a stream in parallel without the need to explicitly use and coordinate different threads in your code.

· Even if processing a stream in parallel is so easy, there’s no guarantee that doing so will make your programs run faster under all circumstances. Behavior and performance of parallel software can sometimes be counterintuitive, and for this reason it’s always necessary to measure them and be sure that you’re not actually slowing your programs down.

· Parallel execution of an operation on a set of data, as done by a parallel stream, can provide a performance boost, especially when the number of elements to be processed is huge or the processing of each single element is particularly time consuming.

· From a performance point of view, using the right data structure, for instance, employing primitive streams instead of nonspecialized ones whenever possible, is almost always more important than trying to parallelize some operations.

· The fork/join framework lets you recursively split a parallelizable task into smaller tasks, execute them on different threads, and then combine the results of each subtask in order to produce the overall result.

· Spliterators define how a parallel stream can split the data it traverses.