Ending Streams: Collection and Reduction - Mastering Lambdas: Java Programming in a Multicore World (2015)

Mastering Lambdas: Java Programming in a Multicore World (2015)

CHAPTER 4. Ending Streams: Collection and Reduction

Pipelines are ended by terminal operations, which we saw in Chapter 3 are divided into three groups: search operations, reductions, and operations with side-effects. Although we surveyed all three groups then, there is much more to say about reductions; this chapter takes up that story.

Broadly speaking, reductions are operations which return a single value that in some way summarizes the values of the stream elements. But this description does not make a distinction—very important to Java programmers—between summarizing operations that create new objects and those that change their operands. Traditional ideas about reduction have been developed for languages supporting immutability; although modern Java programming practice encourages immutability in general, most pipelines will end in mutable collections. So collection, also called mutable reduction, is the most important kind of terminal operation in the Stream API. Collection is a generalization of classical reduction, which in Java programming will be useful mainly with primitive streams. Collection is like reduction in the sense that it summarizes values of stream elements into a single object, but it does so via mutation.

Here is a very simple comparison between traditional Java bulk processing code and collection. If we allow a real-life data source to be modeled by an Iterabie<Book>, declared here as library, we would usually accumulate values from it to a List<Book> like this:

In Java 8, now modelling the data source by a stream<Book>, we could get a similar effect by writing

The collector version has a number of advantages, beyond the obvious improvement in conciseness and readability: the stream operations can be safely executed in parallel, even if the List into which the stream elements are accumulated (ArrayList is used in the current implementation) is not threadsafe. Further, the collector pattern turns out to be very flexible, and collectors are easily composed: Figure 4-1 provides a preview of examples developed later in this chapter to give an advance idea of this flexibility.

FIGURE 4-1. Examples of Stream operations from this chapter

In the imperative version, the key components are the creation of the container (new ArrayList <>()) and the accumulation of elements into it (bookList.add(b)). Collectors also have components for these two tasks, called, respectively, the supplier and the accumulator. We can identify these components on the diagrams introduced in Chapter 3. Figure 4-2 shows the relationship between Figure 3-5 (p. 68), the diagram for the collector created by Collectors.toSet, and these two components.

FIGURE 4-2. The components of Collector.toSet

The supplier defines the container type, always shown shaded in these diagrams (in Figure 4-2, a framework-chosen implemention of Set<Book>). The accumulator populates the container from the incoming stream; in Figure 4-2, it is Set::add.

4.1 Using Collectors

Later sections of this chapter explore the idea of the collector pattern in depth, and discuss when and how you might want to create a custom collector. But before that, we should consider the most common and straightforward occurrence of the pattern in everyday programming: namely, the use of predefined collectors supplied by the factory methods of the Collectors class. These can be divided into two types: stand-alone collectors, useful in their own right, and collectors designed for use in composition with others.

4.1.1 Stand-alone Predefined Collectors

Stand-alone collectors can be divided into three groups, depending on their function:

• To accumulate to framework-supplied containers.

• To accumulate to custom collections.

• To accumulate elements into a classification map.

We already met most of the first group in Chapter 3 (p. 67ff)—those for which the supplier is the constructor for a framework-supplied collection: for toSet it is an implementation of Set, for toList of List, and for the two overloads of toMap it is an implementation of Map. The accumulators for these collectors are also obvious: for the Collection implementations it is add, and for the toMap collectors, put.

The remaining method in this group is joining, which returns a collector that concatenates a stream of String objects into a StringBuiider whose contents are then returned as a String (this is further explained in §4.3.1):

The first overload of joining simply concatenates the strings in its input stream. The second overload accepts a CharSequence, inserting it as a delimiter between its input strings. For example, here is code to concatenate the titles of all books in my library, separating the titles by a double colon:

The third overload accepts a delimiter, a prefix, and a suffix: for example, for a book b, the following code will produce a string concatenating the book’s authors, separated by commas, beginning with the book’s title and ending with a newline:

We could use this code to create a list of strings, each containing all the authors’ names for a single book:

The collectors we have seen so far all accumulate to a collection chosen by the framework. Many Collectors methods have variants that let you specify a supplier for the container:

For example, on page 69 we saw toMap used with a merge function to construct a map from book titles to the date of the latest edition. If the contents of the map will subsequently be required in alphabetic order of titles, placing them into a sorted map might improve performance:

Since the merge function is actually choosing the greater of two values of java.time.Year using its natural order, line can be replaced by

The three overloads of toMap that we have now seen are increasingly general: the first one we met would accept only key- and value-extracting functions. The second accepts in addition a merge function, and this third one also accepts a supplier. This means that if you want to specify a supplier, you must also provide a merge function. But if you want to, you can re-create the behavior of the simple overload of toMap in the presence of duplicate keys by supplying

as the third argument to this overload.

You might expect that corresponding overloads of toList and toSet would be provided to create collectors capable of allowing the specification of custom suppliers. In fact, rather than providing both toList and toSet with an extra overload, a single more general method tocoiiection has been provided instead. This is more versatile: it allows us to choose not only arbitrary implementations of Set and List, but of any subinterface of Collection. For example, we can collect stream elements into a sorted set or into a blocking queue:

A third group of Collectors methods return collectors with the function of classifying stream elements. They are related to the collectors returned by toMap, with the difference that instead of using a value-extracting function, the values they place in the map are the elements themselves—or rather, lists of the elements, one List corresponding to each classification key:

For example, I could classify my books by topic as follows:

Figure 4-3 shows how this example works: for each incoming element, the classifier function is called to determine the key. If the key is not present, it is added, with the value being a singleton List containing only the current element. If the key and a List value are already present, the stream element is added to the list. The supplier for this collector is the no-arguments constructor for a framework-chosen implementation of Map<Topic,List<Book>>, and the accumulator has the function of adding each incoming element as just described. Both of these defaults can be overridden, as we shall see in the next section.

FIGURE 4-3. How Collectors.groupingBy works

A variant of groupingBy is the convenience method partitioningBy, in which the key type κ is specialized to Boolean:

For example, this code would map true to a list of my fiction books and false to a list of the non-fiction ones:

Figure 4-4 shows this example in action.

FIGURE 4-4. How Collectors.partitioningBy works

4.1.2 Composing Collectors

Stand-alone collectors are useful in themselves, but the real power of the Collector API lies in the way that the collectors are designed to work together and with other functions. The importance of composition in the design of Java 8 was emphasized at the beginning of Chapter 3; the Collector API provides a good example of this design principle in action.

For example, suppose we want to represent the distribution of a set of values-say, the number of books in my library on each topic. In the last section, we learned about classifying collectors, which can indeed produce mappings in which the key is a property such as the topic of a book. But the type of the value of these mappings has so far been fixed as a list of the stream elements. To represent a distribution, this list needs to be replaced with an element count. This is only one example of values that can replace a list of elements in the mapping: other possibilities include a list of derived properties of the elements (the publishers of those books, say), or other reductions over the element values (the oldest in the topic, the sum of the pages, etc.) Rather than provide specialized classifying collectors for these various use cases, the Collector API provides an extension point by allowing collectors to be composed together.

Composition allows new collectors to be created by combining the effects of two or more collectors or other operations. The most important form it takes is to allow groupingBy to be combined with a second, “downstream,” collector. In this composition, groupingBy provides the classifier function and the classification keys, and and the elements associated with a given key are forwarded to the downstream collector for further summarization. Figure 4-5 shows how this works for the simple groupingBy overload that we already saw, in which the default downstream collector is the one returned by Collectors.toList.

FIGURE 4-5. groupingBy is a composition.

A second Collectors factory method is provided to allow the creation of a classifying collector composed with a user-supplied downstream collector:

The behavior we have seen so far from groupingBy collectors is equivalent to using this overload to write:

Returning to the example of representing a distribution, we can now see that the problem can be solved by composing groupingBy with a different downstream Collector, one that counts the incoming elements. And, in fact, there is a Collectors factory method for exactly this purpose:Collectors.counting. (Its implementation is explained in §4.4.3). Figure 4-6 shows how it will work in this example.

FIGURE 4-6. Downstream to counting

So to represent a distribution we can write

Other collectors can be used downstream of groupingBy. In fact, many of the Collectors factory methods are designed for exactly this purpose. We can distinguish them from the stand-alone collectors of §4.1.1 by their duality with convenience reductions; for example, the collector returned by counting is the terminal operation count adapted for use as a downstream collector. For other examples:

• Corresponding to the terminal operations max and min are the Collectors factory methods maxBy and minBy. For example, we could create a mapping that would contain, for each topic, the book with the greatest number of authors:

• Corresponding to the primitive stream terminal operations sum and average are the collectors returned by summingInt, summingLong, summingDouble, and their averaging analogs. For example, we could create a mapping that would contain for each topic the total number of volumes on that topic (recall that getPageCounts returns an array of int values equal in length to the number of volumes for that book):

or one that would contain the average height of the books, by topic:

• Corresponding to the terminal operation summaryStatistics are collectors returned by summarizingInt, summarizingLong, and summarizingDouble. These collect primitive values into the corresponding SummaryStatistics object. For example, the following code will produce, instead of only the total count of book volumes by topic as in the last example, an IntSummaryStatistics instance for the stream of volume counts:

which we can print out using

to get a result like this:

• Corresponding to the three overloads of the terminal operation reduce are collectors returned by three overloads of the reducing method. We will return to these when we explore the topic of reduction at the end of this chapter.

The preceding examples all have something in common: the downstream collector—whether it is counting elements, sorting them by comparing a property, or summarizing a property—receives and has to handle the entire stream element. But sometimes a downstream collector needs to work on only a single property; for example, say that I want to create a mapping from each topic to a concatenation of all the titles in that topic. The operation of extracting the title from each book requires something very like the map operation on streams, but in this case applied to the stream of objects dispatched by groupingBy to a downstream collector before it receives them. This requirement is met by collectors created by Collectors.mapping, which allow the user to supply both the mapping and the downstream collector:

Figure 4-7 shows how collectors created by this method work. With the help of Collectors.mapping, it is straightforward to create a mapping from each title to the concatenation of all titles on that topic:

FIGURE 4-7. groupingBy composed with mapping

As a kind of dual to mapping, which applies a function to incoming values before they are collected, collectingAndThen accepts a function that it applies to the container after collection. This “finishing” operation is described further in §4.3.1.

4.1.3 Chaining Pipelines

The techniques of the last section provide different ways of processing the values for each classification key in isolation—finding a maximum value among them, concatenating the values of a string property, and so on. Some problems require a further processing stage, in which the values of different keys are processed together. Take, for example, the problem of finding the most popular topic in my library—that is, the one with the greatest number of books. We know how to produce a mapping from topic to book count:

but now the individual entries in the Map<Topic,Long> have to be compared to find the one with the greatest value. Before this comparison can take place, all the elements in the library must have been processed by the collector to create the classification mapping; only then can the topic-count pairs be compared to find the one with the greatest count value. To be able to apply the operations of the Stream API to the next stage of processing, we need to start another stream having these pairs as elements:

We can now work with the components of individual Map.Entry objects using the methods Map.Entry.getKey and Map.Entry.getValue. Java 8 added the comparator-yielding methods comparingByKey and comparingByValue for exactly this kind of situation so, for the problem of finding the most popular topic, we can write:

We can imitate the fluent style of pipeline processing by chaining the two pipelines together:

At this early stage in the adoption of Java 8, it’s not possible to predict in detail how idioms will evolve. This one has advantages and disadvantages:

• It is concise and readable.

• It can be deceptive, if you aren’t alert to the fact that optimizations can only be applied to the two pipelines separately and that sufficient memory will be required to hold the intermediate collection. In this, it is not actually so different from the behavior of some intermediate operations, such as sorted, which have to collect the entire contents of the stream internally before proceeding to the next step. We’ll explore this issue further in Chapter 6.

Performance issues notwithstanding, adoption of this idiom will depend on its overall effect on readability and maintainability of code. On that basis, this book will continue to use it.

4.1.4 Worked Example: Most Popular Topics

At the end of the last section, we saw how to find the most popular topic. But that code did not take account of the possibility that there might be more than one topic with the maximum number of books and, if so, that we might want to know about all of them. In this section, we will work toward a solution of that problem; rather than simply presenting it, we will approach it in stages. At each stage, stop and think for a moment about how to go forward. To begin with, try to work out an overall approach to the problem.

______________________________________________________

One way of thinking about problems like this is to work backward from the goal. The last important step of the new program will also use max, as before, but the result will now include all the most popular topics instead of only one of them. That suggests they will need to be in a collection and—if this program is similar to the preceding one, so that we are seeking a maximum among map entries—leads us to look for a way of creating a Map<Long,Set<Topic>> (either Set or List would be a reasonable choice), in which the keys are the popularity of each topic and the value is a collection of the topics with that popularity. Following this line of reasoning, we are aiming for a mapping like this—call it targetMap:

And our starting point in getting to it can be the map produced by the first two lines of the previous code:

Call this startMap. How can we get from startMap to targetMap? If you have not found a solution, stop now and try to devise one. Hint: the easiest way to get between them is by using groupingBy.

______________________________________________________

If the goal of a groupingBy operation is targetMap, then the keys that the classifier function is extracting must be the popularities, and since they are the values in startMap, the classifier function must be Map.Entry.getValue.

The values in targetMap come from the keys of the startMap entries, so the action of the collector downstream of groupingBy must be first, to extract the keys from the incoming entries, and second, to accumulate them into a Set. This suggests composing groupingBy with a mappingcollector to extract the keys, that collector itself being composed with a toSet collector to accumulate them:

Putting everything together, the code to find the most popular topics is:

This is by no means the only solution to the problem. You may have considered one of a variety of alternatives:

• Insertion of Book elements into a sorted Map using Topic as the sort key, and retrieving an initial subset of the elements.

• Creation of a frequency map to determine the maximum value, then in a second pass accumulating the topics associated with that value.

• Defining a value object of the kind that we will see later in this chapter, and reducing directly over that.

If you can take the time to explore some of these alternatives, you will be rewarded with an appreciation of the variety of possible strategies available in this style of processing.

4.2 Anatomy of a Collector

There is a straightforward connection between recursive decomposition, as presented in Chapter 1, and collection. Recall Figure 1-2 (p. 14): the merging phase of that diagram shows how two int values can be combined to produce a new one, in that case the maximum of the two. That picture of classical reduction is suitable for an algorithm that always uses combination to produce new values rather than mutating existing ones, but it needs to be modified for collection to mutable containers. Figure 4-8 adapts the earlier picture to show one possible execution of this collector code:

FIGURE 4-8. Parallel mutable reduction: toList

The diagram has been drawn assuming a right-to-left flow of stream elements so that they can be shown in the same order in which they appear in the source. What matters, however—at least if the stream is ordered—is that they maintain their relative position while they are being accumulated into the result container.

The conventional way of accumulating values into a collection is to create a new one, then add elements to it successively. The algorithm illustrated in the diagram adapts this inherently sequential process to parallel execution. For each thread, a supplier function creates a new container (typically a collection but, in general, anything that can accumulate references to elements or element values as, for example, StringBuilder does.)

Each thread can then accumulate elements into its container, using an accumulator function as in the conventional approach. Finally the intermediate containers created by the threads must be merged together; this requires a third function, the combiner, required in order to bring together the result of parallel sequential accumulation. We have not needed to learn about combiners until now, because a combiner does not change the functional behavior of a collector from what the accumulator defines (§4.3.3 makes this precise).

If we call the type of the stream element T (here Book) and the type of the result container R (here List<Book>), these three functions are implementations of three interfaces introduced in §2.4:

• The function to create a new container is in this case ArrayList::new;1 in general it is an implementation of Supplier<R>.

• The function to add a single element into an existing container is in this case List::add; in general it is an implementation of BiConsumer<R,T>.

• The function to combine two existing containers is in this case List::addAll; in general it is an implementation of BinaryoPerator<R>.

These are the three essential components of a collector. They must work together to give consistent results—for example, it is easy to see that Figure 4-8 is only one of a number of possible executions of the code, all of which must produce the same value. In §4.3.3 we will study the rules that ensure that our custom collectors guarantee this, as the predefined collectors do.

4.2.1 Concurrent Collection

From Figure 4-8, it should be clear that the performance bottleneck in parallel stream processing is liable to be the combine operation that brings together the results of processing on separate threads. The framework guarantees that—provided a collector respects the rules of §4.3.3—accumulator operations on a given partition will only ever be executed by a single thread and that combiner operations will be executed safely, even when non-threadsafe containers are being combined. But clearly there is a performance price to pay for this safety, especially in the common case of map merging. So the framework offers an alternative: concurrent collectors.

A concurrent collector is one that has declared CONCURRENT as one of its characteristics (§6.8). This declaration informs the framework that it can safely call the accumulator function on the same result container from multiple threads. This will improve parallel performance, at the cost of losing ordering on stream elements. So the commonest use case for concurrent collection is collection to a Map, and every method toMap in the Collectors class has a corresponding toConcurrentMap method that produces a collector to a ConcurrentMap. Similarly, each overload of groupingByhas a dual groupingByConcurrent method returning a ConcurrentMap in place of a Map. We discuss the performance improvement that these can bring (and the circumstances under which you should use them) in §6.8.

4.3 Writing a Collector

We have seen the range of capabilities that the library’s predefined collectors provide and, further, how that range can be extended by composing them together. With such a wide choice, why would you want to define your own collector? There are two possible motivations: either you need to accumulate values into a container that does not implement Collection (so that you cannot use Collectors.toCollection), or the process of accumulation requires the sharing of state between values being collected.

For a simple example of the second sort, suppose that we have a stream of values, sorted by some property, and we want to group them according to their proximity measured by that property. A physical example would be a route for a power transmission line, where the tower spacing must be no more than a constant, say MAX_DISTANCE. A potential route consisting of a series of tower sites could be analyzed by grouping the sites in segments, in each of which the towers are separated by no more than MAX_DISTANCE, whereas the distance between two towers in different segments is always greater than MAX_DISTANCE. For a very simple example (Figure 4-9), a route planned exactly along the x-axis might contain these points:

If MAX_DISTANCE == 2, these points would be grouped into three segments:

FIGURE 4-9. Transmission tower placements

The obvious representation for the solution to this problem is a nested linear data structure. Since adding a new element to it will require access to the last existing element, it will be convenient to use Deque rather than List as its basis; conveniently, Deque has a modern and efficient implementation in ArrayDeque.

Here is a simple iterative solution:

This is a good starting point for the collector implementation—in fact, it can serve with little modification as the basis for the accumulator. The combiner is more of a challenge: its function is to merge two Deque instances, each representing a solution over part of the input. There are two possibilities for the merge, depending on the distance between the last point in the left part and the first point in the right part. If these are sufficiently close, the last left segment and the first right segment must be merged; otherwise the right part can be simply appended to the left.

Here are definitions of the three components—supplier, accumulator, and combiner—of the collector:

• A supplier can often simply be the constructor for the container; it is tempting to write ArrayDeque: new here. But the correct initial value is an empty container, ready for use by the accumulator or combiner. In this case that is a single empty segment:

• The accumulator has the same task as in the sequential code, that of adding a single Point to a partial solution:

• The combiner merges two partial solutions, as discussed above:

Figure 4-10 shows the supplier, the accumulator, and the combiner working together on one possible execution of the sample input from p. 91.

These three functions can be assembled into a Collector using the factory method Collector.of. The Characteristics parameter to Collector.of is a way of supplying performance-related metadata about the collector (§6.8):

Now all that is needed to realize Figure 4-10 is to call Collector.of and supply the resulting collector to Stream.collect:

FIGURE 4-10. Collection Point instances by proximity

Performance This collector parallelizes well. Its performance characteristics are examined in more detail in 6.8.2, but the discussion can be summarized as follows: executed sequentially, its performance is very close to that of the iterative version shown earlier. Terminating a stream with no intermediate operation costs, the parallel speedup is a factor of about 1.8 on a 4-core machine; as the intermediate operations become more expensive, the relative parallel speedup improves.

4.3.1 Finishers

The discussion of collector composition mentioned in passing (p. 84) the method col-lectingAndThen, which accepts a function to be applied to the container after collection is complete:

This method allows you to provide an additional finishing transformation on the container produced by the collector. For example, to obtain an immutable list of book titles we could adapt the collector returned by toList by writing:

There are several reasons why a finishing function could be required:

• The intermediate container is of the wrong type to be returned (for example, the StringBuiider used internally in joining).

• Some processing has to be deferred until all the elements have been seen, such as computing an average.

• The result should be returned in a canonical form, for example, by rebalancing a tree.

• The result must be “sealed” in some way before being returned, for example with an unmodifiable or synchronized wrapper.

Some collectors always require a finisher for their operation. An example is provided by joining, implemented by a collector although its result type is string, which—as an immutable type—is unsuitable for collection. (Later we will see a reduction form that could be used instead, but that would perform very poorly in the course of concatenating strings for every accumulation or combination operation.) In fact, the container for joining is the mutable type stringBuiider; conversion to String only takes place in the finishing function, after the reduction is complete. As this finisher is always needed, it should be built into the collector; there is an overload of Collector.of forthat purpose:

This finally makes sense of the second type parameter of Collector: it is the type of the intermediate container into which accumulation takes place. When we use a collector, we don’t have to concern ourselves with this type, which relates only to the collector’s internal mechanism; and often in writing one, we may just use the output type, as in the example of the previous section. In the next example (§4.3.2), we have a choice between using a finishing function to convert the output of the combiner to the desired type or of applying a map operation subsequently.

To visualize a built-in finisher, consider this code to create a stream of strings, each containing all the authors’ names for a single book:

Figure 4-11 shows successive stages in its execution. In part (a) the characters in the input strings are accumulated in the internal stringBuiider object that is being built up; in part (b) the input stream is exhausted, the finisher function is applied to the intermediate object, and the resultingstring is returned.

FIGURE 4-11. How Collectors.joining works

Embedding finishing functions into collectors makes them suitable as downstream collectors to groupingBy in a way that they would not be if the finisher had to be applied separately. For example, consider this code to create a mapping from each letter to the concatenation of all book titles beginning with that letter:

If the finisher for joining were not part of the collector but had to be applied separately, after the collection by groupingBy had taken place, it would be necessary to iterate over the resulting map, applying the finisher to each value. Instead, the joining finisher is available to be composed into the groupingBy finisher, to be applied by it to each value of the classification mapping.

4.3.2 Worked Example: Finding My Books

This section extends the discussion of collectors with a slightly more complex example, in which we will work out how to compute the position of each book on my shelves, assuming that they are arranged in alphabetical order of title. I know how many pages each book has and how thick each page is, so if I could create a map from each book title to the total page count of its predecessors on the shelf, I could easily calculate the displacement of each book from the start of the shelf. This is another example of a state-sharing problem, but somewhat harder to solve (and more expensive to compute): each value in the map, except the first, depends on its predecessors.

A major difficulty in solving this problem is in opening your mind to different solutions: every Java developer has written many programs that initialize properties like cumulative displacements, then accumulate changes to them by iterating over data like individual page counts. This is such a familiar pattern that it is tempting to jump to the conclusion that any algorithms for such problems must be sequential. Here we want to break down that assumption by investigating an alternative that uses reduction: that way, we should get a better program, focusing on the essentials of the problem, that may also be a faster program, avoiding contention.

The key step in designing a recursive algorithm like this is devising a data structure to hold partial results so that successive applications of the combiner can eventually merge these together into the final output. But the combiner will also need the input corresponding to the partial results, so the data structure must represent both of these together.

Stop reading for a moment and think about the design of this data structure.

______________________________________________________

The partial results for a single book is the title-displacement pair, and the partial input associated with that is the page count for the book. That leads to the definition of a helper class:

This example will illustrate why there is so much enthusiasm for the (very long-term) project of introducing tuples to the Java language. Value class declarations are verbose and potentially inefficient compared to the implementation of value objects as tuples in other languages. That said, we can at least make the best of a bad job: the ability to define convenience methods like totalDisp is some compensation for having to use a value class.

The problem specifies an unordered map from title to displacement, but the calculation of each DispRecord relies on its predecessor, so an ordered container will be required during the collection process. We’ll use Deque for this purpose, again in order to take advantage of its convenient provision of access to its last element.

We are now ready to write the components of the collector:

• The supplier is easy to define: in this case, all that is needed is to create an empty container, via ArrayDeque::new:

• The accumulator is only a little more difficult: its task is to add a DispRecord to the end of an existing Deque. Stop again and outline the code for this.

______________________________________________________

The accumulator appends a single Book to a Deque<DispRecord>, calculating its displacement from the last element of the deque by adding its displacement to its page count.

• Now for the combiner. This has the task of merging two DequeăDispRecord> instances. If you have not already worked it out, stop now to write or outline the code.

______________________________________________________

The combiner must increase the displacement field of each of the elements in the second one by the total page count of the elements in the first one. This can be calculated from the last of these, again by adding its displacement to its page count. Then the two collections can be merged:

Now the main objective of the problem is achieved—the book displacements are calculated—but the results are not exactly in the form required by the problem specification, which demanded a mapping from book title to displacement. One way of implementing this requirement is to stream the output from the collector we have just defined into a further collector created by Collectors.toMap. The alternative is to add a finisher to the existing collector. Stop for the last time on this problem to think how a suitable finisher could be defined, and what other change would be required.

______________________________________________________

In this case, the work of the finisher is just to create a Map from its input. By the time the finisher is invoked, however, the fork/join threads will have completed work on the combiner, so should be available for a concurrent map merge:

Figure 4-12 shows the four functions we have defined working together on our three sample books from Chapter 3.

All that is needed to realize Figure 4-12 is to call Collector.of and supply the resulting collector to Stream.collect:

FIGURE 4-12. Collecting book displacements

Performance The different factors that contribute to the performance of this program make it an interesting example; §6.8.3 analyzes them in some detail. To briefly summarize that discussion: as it stands, the program is slower than the iterative version, for a number of reasons: it is an example of a prefix sum, in which the value of every element depends on the values of the preceding ones. In a naïve parallel algorithm for prefix sum, like this one, the total cost of the combine operations is proportional to the size of the input data set, regardless of the level of parallelism in use.2 The combine operation used here is quite expensive (unnecessarily so, in fact). A second problem is caused by the expense of the map merge performed by the finisher; this can be mitigated by presizing the map. A third problem is the low per-element workload of the program—in real-life situations, preprocessing before collection will usually be required. Whether these problems make this program unsuitable for parallelization is discussed in §6.8.3.

4.3.3 Rules for Collectors

When a collector is executed in the way pictured in Figure 4-12, a complex interplay takes place between the framework and the collector-supplied components. For this to work correctly, each side must be able to rely on the other obeying certain rules. This section lays those rules down: what a collector can expect from the framework, and the constraints it must itself respect.

The framework guarantees to respect the following conditions:

• New values will appear only as the second argument to the accumulator; all other values will be results previously returned from supplier, accumulator, or combiner.

• Results from the supplier, accumulator, and combiner may be returned to the caller of collect; otherwise, they will only ever be used as arguments to the accumulator, combiner, or finisher.

• Values passed to the combiner or finisher and not returned are never used again; their contents have been processed and should not be reused.

The collector must respect the following constraints:

• Unless it has the characteristic CONCURRENT (see §4.2.1), it must ensure that any result returned from the supplier, accumulator, or combiner functions is thread-confined (i.e., it must not have been made available to any other thread). This enables the collector framework to parallelize processing without being concerned about interference from external threads.

• If it has the characteristic CONCURRENT, the accumulator must be threadsafe using the same concurrently modifiable result container, as the framework may call it concurrently from multiple threads. Concurrent collectors cannot be used when ordering is significant.

• The identity constraint: the empty result container should leave other elements unchanged when combined with them. More formally, for any value of s:

• The associativity constraint: splitting the computation in different places should produce the same result. More formally, for any values of q, r, and s:

• The compatibility constraint: dividing the computation in different ways between the accumulator and combiner should produce the same result. More formally, for any values of r, s, and t, the same value of r should result from executing the two lines of code on the left as the two on the right:

4.4 Reduction

At the start of this chapter, a comparison between collection and its special case, reduction, came out in favor of collection as more generally useful in Java programs. However, reduction is useful in some circumstances; in this section, we will explore the uses of reduction as implemented by the Stream API.

4.4.1 Reduction over Primitives

We already saw (p. 66) convenience methods provided for special-purpose reductions on primitive streams, including sum, min, max, count, and average. In this section, we will see how these relate to the general capabilities of the reduce method and how it can be used to define new functions over streams of primitives. The examples in this section use mtstream, but the other primitive stream classes have exactly analogous methods.

The basic idea of reduction over primitives follows the same divide-and-conquer approach that led us to identify the three components of a collector from Figure 4-8. Using the form of that figure for a simple reduction over primitives—summation over int values—gives us Figure 4-13, representing the code

FIGURE 4-13. Primitive reduction

This figure has two significant differences from the collector diagram:

• A base value—the identity for the reduction—is used in place of empty container instances created by a supplier function in the earlier figure.

• The accumulator and combiner are the same, because only one type is involved.

This helps to clarify the signature of the second overload of mtstream.reduce:

If the convenience method sum were not available, we could write code to implement Figure 4-13 using the second overload of reduce:

Of the convenience methods that we saw in §3.2.4, sum, count, and average are derived in this way from reduce. New functions can be similarly defined: for example, the following code computes the factorial of a variable intArg:

Given an empty stream, this variant of reduce returns the supplied identity. By contrast, the first overload of reduce does not accept an identity and so, given an empty stream, must return an empty OptionalInt. This is the variant used to define the convenience methods max and min. If they were not part of the API, we could use the one-argument version of IntStream.reduce to obtain the same effect. For example:

One execution of this code is shown in Figure 4-14.

FIGURE 4-14. Primitive reduction without an identity

4.4.2 Reduction over Reference Streams

After collection and reduction over primitives, reduction over reference streams looks quite familiar: two of the overloads of Stream.reduce are similar to reduce on primitive streams, and—perhaps confusingly—the third one has a superficial similarity to collection, although its use is very different:

We now consider each of these three overloads in turn. The first accepts only a combiner without an identity and so, analogously to the single-argument primitive reduce, returns an Optional. It is useful in several different situations: one is where a binary operator, like Comparator.compare, returns one of its operands. For example, we could find the first book title in alphabetic order by writing

In fact, this is how the convenience reduction method stream.min (p. 66) is implemented. Execution of this code can be visualized in the same way as the corresponding reduction over primitives (Figure 4-14). A binary operator could also return a newly created object, for example the result of calling one of the binary arithmetic operators on Biginteger or BigDecimai (notice that we choose these types as examples for reduction because they are immutable):

The combiner in all three overloads of reduce must respect the same associativity constraint as for the combiner of a collector and for the same reason: that different executions splitting a computation in different places must nevertheless return the same results. So, for all values of q, r,and s:

The remaining two overloads of stream.reduce require an identity. The object used for this must not be mutated by the accumulator or combiner, as reduce may reuse the same object repeatedly—in contrast to collection, where the Supplier creates a new object at every call.

The first of these two accepts an identity and a binary operator. We could use it in an alternative to the single-argument overload to sum a Biglnteger stream without needing to return an Optional:

Notice again the similarity of this to reduction over primitives; it is only appropriate with an immutable identity like Biglnteger.ZERO. Execution of this code can again be visualized in the same way as the corresponding reduction over primitives (Figure 4-13).

Again, as with collectors, the combiner must respect the identity constraint: given any s and the identity id

The third overload introduces an accumulator, and with it the possibility of returning a different type, for example an aggregate of some kind. Here it is used to calculate the total number of volumes in my library (as illustrated in Figure 4-15):

FIGURE 4-15. Long-form reduction

Notice that you can (and normally would) write this code as separate map and reduce operations:

However, this overload of reduce is provided for those situations in which significant optimization can be achieved by combining mapping and reducing into a single function.

Once again, as with collectors, the accumulator and combiner functions must be compatible, to ensure that all possible executions of a program will give the same result, however the computation is divided between them. The situation is simpler here, however (things are always simpler without mutation!): for all values of r, s, and t, the following equality must hold:

To conclude this section, we should reconsider the claim with which the chapter began, that collection was likely to prove more useful than reduction in Java programming with streams. The fact that the overloads with identity cannot accumulate into mutable types definitely makes them less valuable than collection. But the single-argument overload of reduce can serve the same purpose as collection, if it is combined with a preliminary mapping stage. For the purpose of comparison, this section concludes with a reduction solution to the cumulative page count problem already solved by collection (§4.3). Figure 4-16 shows a visualization of the program; compare it with Figure 4-12 to see the difference in the two approaches.

FIGURE 4-16. Cumulative book displacement (map-reduce version)

Figure 4-16 is divided into two parts, labeled “map” and “reduce”. The map section does the work of the supplier and accumulator components of the collection version; first it creates a DispRecord for each Book using a new constructor:

then wrapping this in a single-element Deque using a helper method wrap:

The reduce section is the same as the combiner in the collection version (p. 99).

Finally, the client code has more work to do: in this version, it must compose the map and reduce stages, as well as explicitly handle the case of an empty stream, rather than delegating everything to the collector as before:

4.4.3 Composing Collectors with Reduction

In §4.1.2 we saw examples of a number of “downstream” collectors—with functions dual to various terminal operations—provided for composition with other collectors. Among them were mentioned three overloads of the reducing method, returning collectors corresponding to the three overloads of Stream.reduce. Now that we understand stream.reduce, we can return to reconsider them briefly:

These collectors are used for the same reasons as their corresponding reduce overloads, but in a downstream context, typically downstream of a groupingBy collector. For example, to find the tallest book in each topic, we could use the first overload:

The three-argument overload is used in a similar way. Notice, however, that instead of an accumulator, this overload expects a Function<T,U>, similar in intent to the mapping stage of the book displacement example of the previous section. For example, I could calculate the number of volumes in each topic of my library using this overload:

The factory method Collector.counting is implemented using this overload. For example, if it were not provided, I could count the number of books in each topic of my library like this:

4.5 Conclusion

In this chapter we have explored what Stream API offers for summarizing the results of stream processing. These fall into two categories: reduction and collection. The role of reduction is for summarizing immutable values, and the Stream API provides various convenience methods to support reduction, particularly over primitives.

However, the most important technique is collection, a generalization of reduction that adapts it to concurrent accumulation into mutable collections, managing access even to those that are not threadsafe. We reviewed library collectors for accumulation to standard library collections, to custom collections, and to classification mappings. Since the collector pattern lends itself to composition, the library also contains a number of collectors specifically designed to be composed with others. Further, the API provides extension points to allow the development of custom collectors where necessary, and we have seen examples of situations that call for these and techniques for developing them. Overall, collection is a powerful and flexible tool. Mastering its possibilities is central to becoming expert in using the Stream API.

That concludes our investigation of how streams can be ended; in the next chapter, we will turn our attention to the dual problem of how they can be started.

____________

1The contract for Collectors.toList provides no guarantee on the type of List created, although in Java 8 it is in fact ArrayList.

2Java 8 provided java.util.Arrays with various overloads of a new method parallelPrefix for computing prefix sums efficiently, but this innovation has not yet reached the Stream API.