Starting Streams: Sources and Spliterators - Mastering Lambdas: Java Programming in a Multicore World (2015)

Mastering Lambdas: Java Programming in a Multicore World (2015)

CHAPTER 5. Starting Streams: Sources and Spliterators

Chapter 3 briefly introduced the subject of stream sources in general, but the subsequent stream processing examples have all used either collections or stream factory methods for their source. This reflects the expected mainstream usage, but it is now time to explore other aspects of stream creation. In this chapter we will cover

• Stream-bearing methods of the platform classes

• Treatment of exceptions thrown by stream sources and in intermediate operations

• The mechanism by which stream sources work

• An example bringing these topics together

5.1 Creating Streams

After the stream-bearing methods of the Collection interface (see p. 50), perhaps the most important ways of creating streams are the factory methods of the stream interfaces themselves. Of these, stream.empty and stream.of were also introduced in Chapter 3. The next group consists ofiterate and generate:

The method iterate accepts a seed and a function (represented by the functional interface unaryOperator) and repeatedly applies the function to produce successive stream elements. Analogous iterate methods are declared by all the primitive streams. Primitive streams have analogous methods. For example, to create a sequential ordered IntStream that will deliver alternating values of 1 and -1 indefinitely, we could write:

Each element in the stream is produced by applying the function to the preceding element. Streams produced by iterate are infinite: you get useful results from them only by using an operation that can be applied to a finite initial substream, like limit and the short-circuit “search” operations (p. 63).

The method generate takes a Supplier, representing a function that produces a value without requiring input, and repeatedly calls it to produce successive stream elements. The resulting sequential stream is not ordered, so generate is intended for use in situations where a series of values is within a distribution rather than sequentially related. For example, it can be used to generate constant streams or custom random distributions.

Factory methods in the third group create an ordered stream containing a range of values. They are defined only on the integral primitive types IntStream and LongStream. These are the IntStream versions:

The API includes both range and rangeClosed in order to accommodate conflicting expectations about whether a range should include or exclude the specified end value. So the same stream can be created in two different ways; for example:

One application of these methods is in the simulation of indexed streams. These are not directly supported in the Stream API but, by mapping elements of IntStream.range and IntStream.rangeClosed against indexed collections or arrays, we can achieve the same effect. Suppose, for example, that I want a listing of the books in my library showing the volume numbers and page counts of each, like this:

I could use IntStream.rangeClosed to achieve this effect:

This technique, of using the elements of an int range as an index, can be extended to multiple data sources, providing a workaround for the absence in the Stream API of an operation to “zip” two streams together.

The last Stream method, not a stream creator but a stream combinator, is the static method concat, which creates a new stream from the concatenation of two existing ones:

The rest of this section lists the methods that have been added to other platform classes to expose their data for processing in the new style.

java.util.Arrays This class has methods to stream arrays of reference types and each of the primitive stream types. For each stream type there are two overloads, the first one streaming the entire array and the second streaming a slice of the array defined by an inclusive start and exclusive end index.

It appears that the methods accepting an entire array (rather than a slice) overlap in function with the various specializations of Stream.of, whose varargs parameter can be passed an array argument. However, Stream.of is really intended for the case when you want to supply a fixed and known number of arguments, and a call supplying an array is, in effect, arbitrarily interpreted as a series of individual values to be streamed rather than as a single array. Avoid this minor problem by favoring Arrays.stream for array arguments.

java.io.BufferedReader This class reads text files; prior to Java 8, its principal usage was through the method readLine, which returns one line per call. In Java 8, it now also declares the method lines:

This method populates the stream lazily, by calling BufferedReader.readLine as and when values are required by downstream operations. It is not intended to work in conjunction with any other operation, so the reader cannot be otherwise accessed while the stream’s terminal operation is executing, and no assumptions can be made about its state after the terminal operation has completed. If you want to continue to use the reader after calling lines, you must first reset it to a previously set mark.

The BufferedReader methods called by lines can throw IOException under various error conditions, for example when the reader has been closed. Rather than forcing the caller to handle these checked exceptions, lines wraps them in an UncheckedlOException, which is thrown from the terminal operation that triggered the read. We’ll explore the effects of this in the next section.

In Chapter 6 we will see that among the factors influencing the performance of parallel streams is the concurrency level achievable by the source. The delivery of data from a streaming source like BufferedReader is inherently sequential, so the library strategy for splitting it is to divide it into batches as it becomes available. If reading the data in this way is the most expensive part of processing, then parallelism will provide little benefit. However, if the spliterator can buffer the data before splitting, it may be able to keep parallel streams supplied with data. We’ll see a useful alternative to sequential file reading in the worked example at the end of this chapter.

java.nio.file.Files This class provides operations on files and directories, usually by delegation to the native file system. To maintain consistency with pre-existing methods of the class, these methods throw IOException if a file cannot be opened, but all subsequent IOExceptions are wrapped in an UncheckedlOException, as with BufferedReader.

The first group of four methods produce directory listings in stream form:

The listings produced by walk and find are recursive—that is, they include all subdirectories of the start directory; those produced by list are for the start directory only. The recursive methods take zero or more arguments of the type FileVisitoption, an enum that allows the traversal to be configured, for example by specifying whether symbolic links should be followed. Otherwise, they differ in whether to support a maximum traversal depth and whether to accept a predicate that filters paths for acceptability based on their BasicFileAttributes (such as modification, access time, or size).

The streams returned from these methods indirectly encapsulate native file handles, so it is good practice to allocate them in a try-with-resources construct. For example, the following code recursively descends the directory tree, starting from the current directory, printing some details about each file:

The last two Files methods allow a file to be parsed into lines of text; they conveniently wrap the BufferedReader method. Since creating a BufferedReader requires a Charset to manage byte-to-character conversion, the new Files.lines methods must be given a Charset also:

Naturally, Files.lines has similar performance characteristics as its underlying BufferedReader implementation.

java.util.regex.Pattern This class has always had a convenience method split, returning an array; in Java 8 it has added an analogous stream-bearing method splitAsStream:

Like split, this method divides its input into substrings, each of which is terminated either by a subsequence matching this pattern or by the end of the input. Here are some examples to show the effect of different pattern matches at the beginning of the input string. The methodexampleSplit returns a string showing the splits made by the supplied pattern on sample output from ls -l:

If the pattern does not match any subsequence of the input, then the resulting stream contains a single element corresponding to the input sequence (line 2 of Table 5-1). A match of non-zero width at the beginning of the input sequence results in the insertion of an empty string at the beginning of the stream (line 3). A match of zero width does not (line 4).

TABLE 5-1. Behavior of exampleSplit

java.util.jar.JarFile Until now, entries in a jar file could be read in bulk only via an InputStream or an Enumeration; a more convenient stream-bearing method has now been added:

java.util.zip.zipFile The new method added to this class is similar to that for JarFile:

java.lang.charSequence Implementations of this interface include String, StringBuffer, StringBuilder, and java.nio.CharBuffer. Java 8 has added two stream-bearing methods, chars and codePoints, which behave identically for the most frequently used Unicode characters:

The difference between these two methods is in their handling of Unicode supplementary characters, that is, those whose code point is greater than U+FFFF. Supplementary characters include ideographs from East Asian languages and pictorial characters (including emojis). The methodchars is generally preferable to codePoints on grounds of efficiency but, unlike codePoints, it does not interpret supplementary characters correctly.

The new methods return IntStream (the most appropriate choice in the absence of a primitive specialized CharStream), so it is useful to know how to convert the int values into a Stream<Character> if the need arises. The key point is that since boxing is required, Stream.mapToObj must be given the right primitive type to work on:

java.util.BitSet The toString method of this class has always returned a string representation of the set of indices of the bits that are set. Now the stream method returns these indices in a form ready for processing:

For example:

java.util.Random and java.util.SplittableRandom The four Random classes (Random has two subclasses, SecureRandom and ThreadLocalRandom) all expose the same methods for generating random streams. These methods are in three groups, corresponding to the primitive stream types that they return:

Each group has four methods, providing different combinations of choice as to whether they deliver a fixed or unlimited number of values and whether each value delivered lies within a defined range. The class SplittableRandom was introduced in Java 8 to support parallel distribution of random number production. It is typically used in a Spliterator (an object used to create a stream, covered in detail later in this chapter) that, given the requirement to create a random series, will divide the series into segments, assigning a separate generator for the creation of each segment. The division uses the method SplittableRandom.split, which constructs and returns a new instance that shares no mutable state with this instance, but creates a series of values with the same statistical properties as the original.

Although the preceding list includes all the important stream-bearing methods in the platform library—in practice, the most commonly used stream sources—it is by no means the whole story: later in this chapter we shall see how defining a Spliterator enables you to create your own stream from any data source.

5.2 Spliterators and Fork/Join

This section continues our study of stream sources with a brief investigation of how they work. You can use streams effectively without understanding how they are created, as the examples in earlier chapters have shown. But you may want to know how the mechanics of stream creation influence performance—we will see the effects in Chapter 6—and further, understanding stream creation now will be useful later if you want to create your own streams from a source other than the platform classes. The worked example in the next section shows a small but realistic example.

Our starting point is the parallel divide-and-conquer algorithm that introduced streams in Chapter 1 (p. 12) and its implementation by the fork/join framework. The key class in the generation of parallel streams by the framework is a subclass of ForkJoinTask; an instance of this class wraps a stream processing task, consisting of the data to be processed and the processing action to be applied to each element of that data. Task execution takes one of two alternative routes: either the data is processed iteratively in the calling thread, or it is divided, with a new task being created to process part of it on a different thread allocated from the ForkJoinPool, and the rest being assigned to the existing task, to be executed again in the calling thread.

So the data accessor that the task uses must support these two alternatives: splitting and iterative direct execution. The type of this data accessor is named for these two functions: java.util.Spliterator. The two key methods of a Spliterator correspond to its two key functions: trySplit,which causes it to divide itself, partitioning its data between itself and a new Spliterator, which it returns, and tryAdvance, which accepts a Consumer and applies it to the next element. Before looking more closely at Spliterator, let’s first see how it works together with the fork/join framework.

The following pseudocode is an extreme simplification of the fork/join task described earlier. In this code, T is the type of the elements to be processed by the stream; the method makeAbstractTask creates a new task, setting its spliterator field. The value sizeThreshold has been calculated by the framework as the lowest task data size that is worth parallelizing, taking into account both the total original data size and the processing environment:

The loop condition for this code shows that a decision to split rather than to process iteratively requires two criteria to be met:

• That a newly forked task could take advantage of currently unused processing capacity. In Java 8, the value of sizeThreshold is just the result of dividing the total data size by the number of threads available, but more sophisticated criteria might be used in the future by the framework to decide this.

• That the data structure’s own conditions are met for making splitting worthwhile. A Spliterator is permitted to respond to a call of trySplit by returning null instead of a new Spliterator, perhaps because the data set is small enough that the gains from going parallel would not outweigh the overhead of splitting, or for some other reason.

These are the key methods of the Spliterator interface:

All except for forEachRemaining are abstract. A brief explanation of these methods follows. Development of a custom spliterator as part of the worked example in §5.4 should help to clarify further how spliterators work and when you would choose to write your own.

trySplit As we have seen, this method creates a new Spliterator and transfers some—ideally, half—of its elements to it. Splits that do not achieve this ideal may still be effective; for example, splitting an approximately balanced binary tree can still lead to useful parallelism. On the other hand we have BufferedReader, which cannot even approximate an equal split and, at an extreme, classes that cannot split at all and whose spliterators always decline to split (which is why the method Collection.parallelStream is specified to return a “possibly parallel” stream).

If order is significant for a stream source, then the elements that the spliterator covers after it has executed trySplit must succeed those covered by the spliterator that it returned.

tryAdvance This method combines the functions of the Iterator methods next and hasNext. If no elements remain, tryAdvance returns false; otherwise, it returns true after calling the accept method of the supplied Consumer with the next element as the argument. This is the heart of two of the major improvements that spliterators offer over iterators: fewer method calls are needed for each element, resulting in big performance improvements, and there is now no risk of the race conditions that can arise as a result of a collection being mutated between a call of hasNext and a call of next.

forEachRemaining Provides a bulk traversal mechanism: rather than returning after processing a single element, as tryAdvance does, this method processes all the remaining elements in the spliterator in a single call. This may provide performance improvements by eliminating the per-call overheads of tryAdvance. The default implementation simply calls tryAdvance repeatedly until it returns false.

estimateSize Returns an estimate of the number of elements covered by the spliterator. If the spliterator is supplying an infinite stream or if it would be too expensive to compute the size, estimateSize can return Long.MAX_VALUE. However, even an inexact estimate is often useful and inexpensive to compute.

characteristics Returns a set of characteristics (§6.3) for this spliterator.

5.2.1 Streams from Spliterators

Spliterators supply the essence of what is needed to make streams. To actually make them, the utility class StreamSupport provides methods that accept a Spliterator to create a reference stream, or one of Spilterator’s specialized primitive subtypes, Spliterator.ofInt,Spliterator.ofLong, or Spliterator.ofDouble, to create a primitive stream. In the last section of this chapter we will see a practical example of the creation of a stream from a custom spliterator.

It is worth noting here, however, that StreamSupport.stream can also be used as a slightly clumsier substitute for the stream-bearing methods of the Collection interface. That is because every Collection instance is an implementation of Iterabie, which exposes a method Spliterator. So for any Iterable<T>, iter say, you can write

where the argument false means that in this case the generated stream will be sequential rather than parallel. This is in fact how the stream-bearing methods of Collection are implemented.

The default implementation of Iterabie.spliterator has poor performance, because any good splitting strategy will depend on the physical structure of the specific Iterable. So implementations will usually override it, as the classes of the Java Collections Framework do. But if you have to use an API that accepts or returns Iterable rather than Collection instances, it is worthwhile knowing this way of performing stream processing directly on them, rather than having to first dump them into a collection.

5.3 Exceptions

We saw in §2.7.2 that lambdas have no special mechanism for managing checked exceptions; any checked exception that a lambda throws must be explicitly declared by its function type. As a result, checked exceptions do not fit well with lazy evaluation as implemented in the Stream API. In this section we will explore the problems; it should also become clear why this section is located in the chapter on creating streams: the most common occurrence of checked exceptions in stream processing is in the API for files used as stream sources.

To disentangle the issues, we will look at a series of increasingly complex scenarios. The basic use case that we are working toward is the problem of listing the contents of every text file in a directory. The idea is that Files.list (p. 115), supplied with a directory path, generates a stream of file paths, one for each file in the directory; these are in turn supplied to Files.lines, which generates a stream of lines from each one. These streams can be concatenated into a single stream by flatMap. If no exceptions were in the picture, we could write, for any directory path start:

This is the basis of the worked example later in this chapter (§5.4): an implementation in Java of grep, the Unix utility that matches lines in text files against regular expression patterns.

Since the problem revolves around the point at which exceptions are thrown, let’s start with the basic case in which exception throwing is not delayed at all. The following code creates a stream of Path objects corresponding to the contents of the directory whose path is start. For the sake of the example, we will just create a stream without using it, and exception handling will be delegated to the caller by rethrowing the checked exception wrapped in the RuntimeException subclass UncheckedlOException. First, though, we will print a stack trace to show what has happened:

IOException will be thrown and caught if start is not a path to a directory that can be opened for reading. For example, if the directory permissions make it inaccessible, the stack trace will look like this:

This is in line with our ideas of how exceptions normally work, though perhaps not obviously in line with our expectations about the lazy evaluation of streams. No terminal method has been invoked on the stream created by the call of Files.list, so its elements have not been evaluated. But the attempt to open the directory ./fooDir has been made eagerly.

Next, suppose instead that the directory has been opened and the stream of paths has been constructed successfully. Now flatMap calls Files.lines, which will first open each file and then read its contents into a stream of lines that will be appended into the stream to be returned. What will happen if the attempt to open one of these files fails? As the introduction to Files mentioned (p. 115), I/O-based stream-creating methods like Files.lines throw the checked exception IOException in this situation. Instead of a directory, we can make a file inaccessible to show the problem; in the interests of having the simplest possible intermediate operation, we use peek to evaluate the behavioral parameter:

That code produces the following output:

Notice the stack frame for forEach. This corresponds with our understanding that it is execution of the terminal operation that initiates evaluation of the pipeline elements.

Finally, we can replace the call of peek with flatMap, which will take the streams of strings from each file and concatenate them into a single stream:

In this version, the failure to open a file will produce a stack trace very like the last example. But suppose instead that Files.lines succeeds in opening a file and returns a stream, as yet unevaluated, of the lines that it contains. Once the files are open, the terminal operation will call the flattening code to concatenate the separate streams of lines into a single stream. In order to do this, the streams must be evaluated and the now-opened files must be read; this action too can fail. But the lambda whose execution opened the file is now out of scope and the file-reading code is being called directly, so failure notification must take place via an unchecked exception. To emphasize this, we can break the pipeline code down to an equivalent version in which each stage is extracted to a local variable:

Since flatMap is no longer on the call stack when its lambda fails, its throws clause is irrelevant. For this reason, platform library code used to create streams always wraps checked exceptions in unchecked ones. The most common cases are covered by wrapping IOException in the newjava.io.UncheckedlOException; this is thrown when failures occur after a file has been opened. In the case of such a failure, the stack trace will look like this:

This stack results from the evaluation of one of the streams of lines raising a MalformedlnputException, thrown in the course of processing a file by a Charset that has encountered a Unicode character it cannot decode (for example, as a result of attempting to read a binary file as though it were text). This checked exception is caught by the hasNext method of BufferedReader (called from Files.lines) and wrapped in an UncheckedlOException that rises uncaught to the top of the call stack. The “18 more” stack frames referred to at the bottom of the checked exception stack are the same 18 frames that constitute the whole of the unchecked stack above it; the duplication is omitted for brevity.

So lazily evaluated code called from within pipeline operations can throw only unchecked exceptions, which will end the terminal operation and so stop pipeline processing altogether. If instead we expect errors from which we want to recover, they must be notified by checked exceptions thrown from eagerly evaluated operations. For example, we saw earlier that attempting to read characters from a binary file results in a (checked) MalformedinputException; for the current use case—printing the contents of every text file in a directory—there is a simple recovery action: skip that file. But that can only be taken if the method throwing it is eagerly evaluated, as happens with Files.readAllLines, which eagerly reads an entire file, throwing a checked exception if it encounters an undecodable byte sequence. It is this method that will form the basis of our solution to the recursive grep problem of the next section.

5.4 Worked Example: Recursive grep

The command-line utility grep was originally written as a stand-alone application for an early version of Unix, to search text files for lines matching a supplied pattern in the form of a regular expression and, by default, to print them. Since then, grep, in different variants, has become standard in the core library of every version of Unix and Linux. Various options can modify its actions in different ways, including:

• Recursively searching entire directory subtrees

• Searching for lines that do not match the pattern

• Suppression of match printing

• Printing the names of files containing matches (instead of, or as well as, printing the matching lines)

• Printing the count of matches in each file searched

• Printing the context of a match (several lines before or after the match)

The exercise of reproducing the behavior of grep using streams is interesting in itself, and potentially useful for embedding grep’s functionality within a Java program. (But please note: if you really are thinking of creating a performant grep substitute, you should pay close attention to the spliterator implementation at the end of this section.) We will begin by executing the simplest grep behavior on each file in a directory tree, and then go on to incorporate a few of its many options.

grep -Rh The first version of the problem is to search every file whose name begins with test and has the extension .txt, anywhere in the filesystem under the directory startDir, for mixed decimal numbers. These will match with the regular expression “[-+]?[0-9]*\.?[0-9]+”. Our program will imitate the behavior of grep -Rh: the -R option forces grep to search the filesystem recursively, and the -h option prevents the usual prefixing of each matching output line with the name of the file in which it was found. (Implementing that feature will be the next problem after this one.) We will need the following preliminary declarations:

Two kinds of pattern matching are in use here: regular expressions and glob-bing, which you may know as shell pattern matching. Globbing is used here to show how easily it can be introduced into stream-processing programs. Although it is less powerful than regular expression matching, it is much more concise and convenient in those cases where it does work.

A suitable starting point is the code we developed in exploring the subject of exceptions thrown in pipeline operations (p. 123). Here it is repeated for convenience, only changed in respect of an added assumption that the enclosing method can rethrow an IOException resulting from a failure to open the start directory, so it need not be handled in this code:

This requires some alterations before it can become our first version. Stop reading for a moment, examine the code carefully, and make a list of the changes that are needed to make it satisfy the problem description given. (We will still leave calls of printStackTrace as placeholders for exception handling.)

________________________________________________________

Here is a list of the changes needed:

• Calling Files.list, we will see only files in startDir. To search a directory subtree, we should use Files.walk (see p. 115).

• For the reasons explored in the discussion of exceptions, the call of the lazily evaluated method Files.lines must be wrapped in code that rethrows IOException as UncheckedlOException, or replaced by a call to Files.readAllLines (p. 126). Files.readAllLines returns a list of strings that can be used as a stream source.

• The pipeline needs some filters on the paths produced by Files.walk: to remove directories from the Path stream (this is more efficient than letting them be trapped by a failure in readAllLines), and to ensure that only files matching the name pattern “**/test*.txt” are processed.

• Filters are also needed on the text lines returned from the files: to remove the empty strings resulting from processing of non-text files and, of course, to remove lines that do not match the regular expression.

• The terminal operation should print the matching lines. It uses forEach, which does not impose ordering, except accidentally. For the moment, let’s diverge from grep by assuming that we only care about getting the matching lines, not about the order in which they appear.

Applying these changes to the preceding code, we get code that reproduces the behavior of grep -h:

grep -R For the first variant on this solution, consider removing the option -h so that grep is required to prefix every output line with the path of the file in which it was found. Before reading on, stop and think how you would change the preceding code to implement this, restructuring it if necessary.

________________________________________________________

Clearly, the path can only be prefixed to the lines of the file at a point where it is still available, which means within the lambda for the flatMap. In the body of the inner try, the lambda parameter path is in scope and can be prefixed to each line:

That is a solution, of sorts. But it’s unsatisfactorily wasteful, in that it is performing string concatenation—an expensive operation—for every line in every text file, even though many of them are likely to be discarded by the regex matching filter later on. That can easily be avoided by moving the regex matching filter from its present downstream position to before the path is concatenated with each line:

And the solution to this problem is a small modification of the preceding one:

grep -Rc The -c option suppresses normal output, instead printing a count of matching lines for each input file. Again, stop and think how to do this before reading on.

________________________________________________________

This solution is quite straightforward, once you see that since the output lines are one-to-one with files containing matching text, the output from each file must be collected into a single string before being passed to the terminal operation. So map rather than flatMap is the appropriate operation, with each path, startDir/foo say, being mapped to a string like “startDir/foo: 3”. The line count can be obtained from the terminal operation count that applied a stream of matching lines for each file. This gives us:

grep -b The -b option requires each line of output to be prefixed by its character displacement in the file. So a search for the pattern “W.*t” in this farsighted tribute to immutability:1

should produce the output

At first sight, this looks very like the book displacement problem of Chapter 4 (p. 90), and indeed one option would be to solve it in just the same way using a custom collector. Closer examination, however, shows an important difference in grep -b that will lead to a better solution. The book displacement problem requires the calculation of a running total, making it an example of a prefix sum, a problem in which the value of every element depends on the values of the preceding ones. In a naïve parallel algorithm for prefix sum, like the book displacement example, the total cost of the combine operations (that is, the total delay imposed by them) is proportional to the size of the input data set, so going parallel may not produce the performance gains that we would like to see.2

The current problem can be recast, however, to avoid this difficulty. If we could treat the data of the input file as a byte array in memory, the displacement of each line could be determined without any need to use the displacements of its predecessors. The memory mapping feature of Java NIO provides exactly this ability; its use allows the operating system to manage the correspondence between filestore and memory transparently and efficiently. For medium or large files (memory mapping is usually worthwhile only for files of hundreds of kilobytes in size, or more), memory mapping can offer big performance gains; in particular, random access carries no performance penalty. Memory mapping is implemented in Java by the class java.nio.MappedByteBuffer.

Notice that the earlier grep examples, like most file processing tasks, are also unlikely to benefit from parallelization, because the performance bottleneck will be at the file input stage rather than in stream processing. So the Spliterator technique we are about to use would improve performance for any such task, provided that the file is large enough to justify the overhead of memory mapping.

In this problem, memory mapping has the added advantage of allowing us to avoid having to calculate the displacement of each line from that of its predecessor; instead, when the custom spliterator breaks the buffer into lines, the displacement of each line will be known from its index in the buffer. The function of the spliterator will be to supply the stream with value objects, each of which encapsulates a line together with its index:

Spliterators are usually named by the data structure that they are splitting, but to describe this spliterator adequately, its name should include not only its data structure, ByteBuffer, but also the type of the objects that it will yield. Since the name ByteBufferToDispLineSpliterator is too long to use in code to be printed in book form, we’ll abbreviate it to LineSpliterator. A LineSpliterator is constructed to cover a range of the ByteBuffer bounded by two indices, lo and hi, both inclusive, supplied at construction time:

Before reading on, see how much of the implementation of LineSpliterator you can supply, starting with trySplit.

__________________________________________________

Figure 5-1 shows the buffer at the start of processing. The range of the first spliter-ator created is the data in the entire buffer, including the terminating newline byte.

FIGURE 5-1. Initial LineSpliterator configuration

If this range is to be split, the aim will be to divide the buffer into two approximately equal parts, each of them again terminated by a newline. A suitable dividing point between the parts will be found by making a linear search for a newline, in either direction, starting from the midpoint of the buffer. This search represents the algorithmic part of the overhead of splitting (the rest is the infrastructure overhead of creating and forking a new fork/join task). Its cost is proportional to the average line length, rather than to the total buffer size as with the book displacement program (§4.3.2).

Figure 5-2 shows this search process and the coverage of the two spliterators that result.

FIGURE 5-2. Operation of LineSpliterator.trySplit

Suppose, however, that the search fails to detect a newline character before encountering the end of the LineSpliterator range. Given that it starts from the range midpoint, such a failure suggests that there are probably only a few lines in the entire range—possibly only one—so in this situation it is reasonable to decline to split by returning null rather than searching for a newline in the opposite direction.

Once you have understood the algorithm for trySplit, the code is straightforward:

If you have not already written code for tryAdvance, stop now and sketch an implementation.

________________________________________________________

The purpose of tryAdvance is to facilitate processing of the next available DispLine instance. It searches for the next newline character, creates a DispLine instance from the bytes traversed in the search, and applies its supplied Consumer argument to it. Finally, it reduces the spliterator’s range to exclude the bytes just processed, and returns a boolean signifying the availability of further input.

Two other Spliterator methods must be overridden. The method estimateSize returns an estimate, necessarily approximate in this case, of the number of elements that would be returned by repeated calls of tryAdvance.

The method characteristics returns the characteristics of this Spliterator, used by the framework to choose appropriate optimizations (see §6.3). Here is a suitable implementation of characteristics for this problem:

Finally, here is how LineSpliterator could be used to simulate the action of grep -b. This is the code for searching a single file; the earlier examples show how it could be extended it to a recursive search:

We saw StreamSupport used earlier (§5.2.1) to create streams from spliterators. In this case, we are supplying true as the second parameter to StreamSupport.stream to ensure that a parallel, rather than sequential, stream will be generated. For the terminal operation, we are usingforEachOrdered to produce the output. This is in contrast to the earlier examples in which, as we saw, there is little point in invoking parallelism. In principle, of course, out-of-order execution is possible with forEach even on sequential streams, but that might have been acceptable for the earlier grep options. For this one, however, a user would certainly be surprised if the matching lines appeared in a random order.

Summarizing this section, we should note that programming a grep equivalent has shown some strengths of the Stream API: the solutions for the earlier examples are straightforward, and respecifying the problem to simulate the different options required only small corresponding changes in the code. In the last example, we saw how cooperation with Java NIO enables the Stream API to process input from large files with, as we shall see in §6.7, a very high degree of parallel speedup.

5.5 Conclusion

This chapter had three connected purposes: to explore the library facilities for creating streams, to explain the mechanism by which stream creation works, and to show how and why you would write your own implementation of it. We saw that stream-bearing methods have been added to a wide variety of platform classes, allowing streams to fulfill their role of conveying any kind of data for processing through intermediate operations and into a terminal operation. In processing streams from any of these methods, you gain the advantages in expressiveness and readability of parallel-ready code; whether you can effectively extract parallelism depends, as we have seen, on how effectively the stream source can split its data. Splittability is only part of the story, however; in Chapter 6 we will see how it combines with other factors, such as data set size and pipeline workload, to determine the overall speedup that can be obtained by parallelization.

The main example of the chapter showed off some of the strengths of the Stream API: the solutions for the earlier grep options were straightforward, and respecifying the problem to simulate the different options required only small corresponding changes in the code. In the last part of the example, we saw how cooperation with Java NIO enables the Stream API to process input from large files with, as we shall see in §6.7, a very high degree of parallel speedup.

____________

1The Rubaiyyat of Omar Khayyam (q. 51), trans. Edward Fitzgerald, 1st edition

2The seriousness of this problem depends on how expensive the combine operation is. Java 8 provided java.util.Arrays with various overloads of a new method parallelPrefix for computing prefix sums efficiently, but this innovation has not yet reached the Stream API.