Stream API - Wrox Press Java Programming 24-Hour Trainer 2nd (2015)

Wrox Press Java Programming 24-Hour Trainer 2nd (2015)

Lesson 20. Stream API

In this lesson you’ll learn how to work with the new Stream application programming interface (API) introduced in Java 8 (not to be confused with I/O Streams). Stream API enables you to write data processing in a simpler and more understandable way. Most of the examples in this chapter illustrate iterating and manipulating data from Java collections, but you should know from the very start that the Stream API is not just another type of a data collection. It’s an abstraction over a bunch of data that your program needs to process.

The data can come from a collection, from some function that generates data, or from an I/O stream. Using the Stream API and lambda expressions, you can write simple-to-read and efficient iterators that will result in a subset of the incoming data or some kind of a data aggregation.

All new classes and interfaces supporting the Stream API are located in the package java.util.stream, and the Stream interface is the main player there. Some old classes (for example, BufferedReader) located in other packages now include new methods returning the reference to its data as aStream.

Stream Basics

A stream is an abstraction that represents zero or more values. Think of it as a fancy iterator that enables you to declare one or more operations on the data and then perform these operations in one pass. But whereas a regular Java Iterator works sequentially, streams can also be processed in parallel.

Let’s start with a simple example. This chapter uses a collection of beers to make working with stream API more fun. The class Beer is shown here:

public class Beer {

public String name;

public String country;

public float price;

Beer(String name, String country,float price){

this.name=name;

this.country=country;

this.price=price; }

public String toString(){

return "Name: " + name + ", price: " + price;

}

}

Say you have a collection named beers that can be populated by the method loadCellar(). Now you want to create another collection that includes only American beers. This is how you can do it using the Stream API:

List<Beer> beers = loadCellar(); // populating beer collection

List<Beer> americanBeers = new ArrayList<>();

americanBeers = beers.stream()

.filter(brr -> "USA".equals(brr.country))

.collect(Collectors.toList());

Calling the method stream() sets the beers collection as a source of the stream. Then your code filters out only the beer objects where country is not the United States. Finally, the code invokes the method collect() to place filtered beers into another list: americanBeers. In this example I’ve chained only two operations—filter and collect. But you can specify a lot more, including map, reduce, find, sort, and match.

Note that I’ve used a lambda expression to specify the filter criteria. Another interesting thing to note in the preceding code is type inference; the variable brr was never declared. Because I’ve used generics in declaring the beers collection, the Java compiler knows that it stores objects of the Beer type, and if we’re using lambdas to process these objects, Beer is assumed as the argument type.

Intermediate and Terminal Operations

There are two types of operations that you can apply to a stream: intermediate and terminal. You can specify multiple intermediate operations and only one terminal at the end. In the example, filter() is an intermediate operation and collect() is a terminal operation.

Each intermediate operation declares what you want to do with the stream data before applying the terminal operation, which produces a concrete result, for example sum or average, print some output, or a new collection (as in the case of this example).

How can you say that a particular operation is intermediate or terminal? Intermediate operations always return a Stream, whereas terminal ones return anything but a Stream (including void ). As a matter of fact, stream operations can be chained into a pipeline because each intermediate operation returns a Stream.

Lazy Operations

Intermediate operations don’t even try to read data until the terminal operation is invoked. The whole idea of using intermediate operations is to express your intentions in a declarative form. Consider the following code snippet:

OptionalDouble averagePrice = beers.stream()

.filter(brr -> "USA".equals(brr.country))

.mapToDouble(brrr -> brrr.price)

.average();

I explain you this code a bit later, but it reveals the intentions: “We want to filter out non-American beers and then apply the map operation to extract the beer price. Finally we calculate an average beer price.” Neither filter() nor mapToDouble() is invoked until the average() method is called (I prove it to you in the next code sample). But knowing upfront what are you planning to do with the data allows Java to create the most efficient plan for executing this code. By lazy execution I mean that mapToDouble() isn’t performed on the beers that are not produced in the United States.

Before giving you the proof that intermediate operations are not being called until the terminal operation is invoked, you should have an understanding of what the terms filter, map, and reduce mean. Assuming that you know the basics of working with relational databases, the explanations use the analogy of SQL statements.

· Filter: Select the object that meet certain criteria. It’s like using select * with a where clause in the SQL statement. The size of the resulting collection can be smaller than the original, but the resulting objects include all attributes (think columns).

· Map: Select only a subset of properties of the objects without filtering. It’s like selecting specific columns in the SQL query without the where clause, for example select price from beers. Map creates a new stream as a result of applying the specified function to each stream element. The size of the resulting stream is the same as the original.

· Reduce: Aggregate the data. The relevant SQL examples would be select count(*) or select sum(price) .

Now let’s look at the proof that intermediate operations don’t access the data. In the class LazyStreamsDemo I’m not going to chain the operations on the stream:

public class LazyStreamsDemo {

// Populate beer collection

static List<Beer> loadCellar(){

List<Beer> beerStock = new ArrayList<>();

beerStock.add(new Beer("Stella", "Belgium", 7.75f));

beerStock.add(new Beer("Sam Adams", "USA", 7.00f));

beerStock.add(new Beer("Obolon", "Ukraine", 4.00f));

beerStock.add(new Beer("Bud Light", "USA", 5.00f));

beerStock.add(new Beer("Yuengling", "USA", 5.50f));

beerStock.add(new Beer("Leffe Blonde", "Belgium", 8.75f));

beerStock.add(new Beer("Chimay Blue", "Belgium", 10.00f));

beerStock.add(new Beer("Brooklyn Lager", "USA", 8.25f));

return beerStock;

}

public static void main(String[] args) {

List<Beer> beers = loadCellar();

// First intermediate operation

Stream<Beer> americanBeers = beers.stream()

.filter(brrsssss -> {

System.out.println("Inside filter: " +

brrsssss.country);

return "USA".equals(brrsssss.country);

});

// Second intermediate operation

DoubleStream americanBeerPrices = americanBeers

.mapToDouble(brrr -> {

System.out.println("Inside mapToDouble: "

+ brrr.name + ": " + brrr.price);

return brrr.price ;

});

// Commented out terminal operation

//System.out.println("The average American beer price is $"+

// americanBeerPrices.average().getAsDouble());

}

}

The preceding program creates a stream from the beers collection and then applies two intermediate operations: filter() and mapToDouble(). The first one filters out non-American beers, and the second performs the map operation to keep only the beer price, ignoring beer’s other fields. TheLazyStreamsDemo class has a terminal operation that’s supposed to calculate an average price of American beers, but I commented it out on purpose.

Note that each intermediate operation has a println() statement. If you run the program LazyStreamsDemo as is, you don’t see any outputs on the system console. The intermediate operations are not invoked on a stream until the terminal operation is specified! The intermediate operations just declare your intentions. Now uncomment the last two lines in LazyStreamsDemo and rerun it. This time you see plenty of output:

Inside filter: Belgium

Inside filter: USA

Inside mapToDouble: Sam Adams: 7.0

Inside filter: UkraineInside filter: USA

Inside mapToDouble: Bud Light: 5.0

Inside filter: USA

Inside mapToDouble: Yuengling: 5.5

Inside filter: Belgium

Inside filter: BelgiumInside filter: USA

Inside mapToDouble: Brooklyn Lager: 8.25

The average American beer price is $6.4375

The mapToDouble() operation worked only for the American beers. Note that mapToDouble() returns a stream of type DoubleStream. It’s a special type of a stream that works on primitive double values. We use it to calculate the average value of double beer prices. There are alsoIntegerStream and LongStream to work with int and long data types, respectively.

To summarize, treat intermediate operations as a laundry list of required actions that are performed along with a terminal operation in one pass. Neither intermediate nor terminal operations can’t modify the source data. Streams are immutable.

Parallel Versus Sequential Processing

A party of ten walk into an Octoberfest tent. They are seated at a table, and the waiter stops by. One of the guys say, “Please bring us ten mugs of Leffe Blonde, and do it as follows: go to the bar, fill the first mug and bring it here; then return and do the same with the second one. Repeat ten times.” The waiter politely replies, “Please don’t tell me how to bring your beer.” He went to the bar that had ten beer dispensers, filled all ten in parallel, and brought them all at the same time. The waiter optimized the process. He just needed the customers to tell him what to do but not howto do it.

Parallel processing rules! I’ve already mentioned this while describing iterating collections with the forEach() method in Lesson 13. The same applies to streams. When you invoke the method stream() on a data source, there is a chance that the data processing will be optimized and performed in parallel; the Java runtime may internally split the data into chunks, perform the operations in parallel, and reconstruct the result.

If you want to make sure that the processing is performed in parallel, use the method parallelStream() on your data, which may internally create multiple threads for processing the stream’s data. Java 7 introduced the Fork/Join framework for implementing parallelism, but it was not simple to code. In Java 8 the Fork/Join routine is hidden from application developers inside the stream implementation.

However, there is no guarantee that your application code will perform faster with parallelStream(). You need to benchmark your code by comparing the speed of parallelStream() versus the speed of stream(). The results depends on your application code as well as on the Java internals for your data source. Even the Java documentation states that parallelStream() returns a possibly parallel stream.

When to Use Parallel Streams

If you’re interested in deeper understanding of when to use parallel streams, read the online article "When to use Parallel Streams" written by a group of Java experts led by Dr. Doug Lea.

Sorting Collections and Streams

Sometimes you need to sort data values in ascending or descending order. Hence the Java runtime needs to be able to compare values. It’s a simple task for primitive data types: 3 is greater than 2, and 27 is less than 28. But how do you compare complex data types such as objects? What does it mean to sort the collection of Beer objects? Which of the Beer ’s properties should be compared to place beers in a sorted order: prices, names, countries, or a combinations of these attributes?

A programmer needs to specify the rules for object comparison—for example, sort beers by price in an ascending order. Let’s see how to specify sorting rules for general Java collections first and then how to sort streams.

Sorting Java Collections

A collection can consist of multiple objects, but you just need to know how to compare two objects to place them in a certain order. Then the method sort() on your collection compares each pair of objects. On rare occasions, people need to apply different sorting algorithms, and most likely invoking the method sort() is all you need. Java interfaces Comparable and Comparator enable you to specify the comparison rules.

Using the Comparable Interface

If a class implements the Comparable interface, a program can compare the current instance of an object with another object of the same type. You need to add implements Comparable to the class declaration and implement the method compareTo() there. See how the Beer class may look if we want to be able to compare beers by price.

public class Beer implements Comparable<Beer>{

public String name;

public String country;

public float price;

Beer(String name, String country,float price){

this.name=name;

this.country=country;

this.price=price;

}

public int compareTo(Beer otherBeer) {

if (this.price > otherBeer.price){

return 1; // This beer is "larger" than the other

} else if (this.price < otherBeer.price) {

return -1; // This beer is "smaller" than the other

} else {

return 0; // The beers are "equal"

}

}

public String toString(){

return "Name: " + name + ", price: " + price;

}

}

The method compareTo() takes one argument—the object to be compared with. If according to our rule this beer value is “larger” than the other, the method compareTo() must return 1. If this beer value is “smaller”, then it returns a -1. If values are “equal," compareTo() returns zero. The current example compares prices.

We’re going to reuse the same beer collection used earlier in this lesson. The following code snippet uses the class java.util.Collections and shows you how you can sort it by prices:

List<Beer> beers = loadCellar(); // populate beer collection

Collections.sort(beers);

beers.forEach(System.out::println);

Here’s the expected output:

Name: Obolon, price: 4.0

Name: Bud Light, price: 5.0

Name: Yuengling, price: 5.5

Name: Sam Adams, price: 7.0

Name: Stella, price: 7.75

Name: Brooklyn Lager, price: 8.25

Name: Leffe Blonde, price: 8.75

Name: Chimay Blue, price: 10.0

Comparable interface can sort objects by a single attribute only, which limits its use. If you’d want to sort beers by names and prices, you need to consider another solution using the Comparator interface.

Using the Comparator Interface

You can use the class that implements the Comparator interface to specify the rules for comparison of any two objects of a certain type. For example, you can have the class Beer that doesn’t implement any interfaces and separately the class PriceComparator that implements the Comparatorinterface and has the rules for comparing prices. As of Java 8, you don’t even need to create a separate class with comparison rules; you can use lambdas instead. And yes, Comparator is a functional interface with only one abstract method: compare().

Let’s see a couple of examples of sorting beers by one or more attributes using the Comparator interface and lambda expressions. These examples use the original class Beer that doesn’t implement any interfaces:

public class Beer {

public String name;

public String country;

public float price;

Beer(String name, String country,float price){

this.name=name;

this.country=country;

this.price=price;

}

public String toString(){

return "Name: " + name + ", price: " + price;

}

}

The class Comparator has a method comparing(), which takes a lambda expression that extracts the attribute that needs to be used for comparison—for example, price:

List<Beer> beers = loadCellar(); // load the beer collection

System.out.println("=== Sorting by ascending price");

beers.sort(Comparator.comparing(beer -> beer.price));

beers.forEach(System.out::println);

Running this code against your beer collection properly sorts the beers by ascending price. It prints the following:

=== Sorting by ascending price

Name: Obolon, price: 4.0

Name: Bud Light, price: 5.0

Name: Yuengling, price: 5.5

Name: Sam Adams, price: 7.0

Name: Stella, price: 7.75

Name: Brooklyn Lager, price: 8.25

Name: Leffe Blonde, price: 8.75

Name: Chimay Blue, price: 10.0

The method reversed() allows sorting in descending order, for example:

Comparator<Beer> priceComparator =

Comparator.comparing(beer -> beer.price);

System.out.println("=== Sorting by descending price");

beers.sort(priceComparator.reversed());

beers.forEach(System.out::println);

The following is the output of the preceding code snippet:

=== Sorting by descending price

Name: Chimay Blue, price: 10.0

Name: Leffe Blonde, price: 8.75

Name: Brooklyn Lager, price: 8.25

Name: Stella, price: 7.75

Name: Sam Adams, price: 7.0

Name: Yuengling, price: 5.5

Name: Bud Light, price: 5.0

Name: Obolon, price: 4.0

If you want to sort by multiple fields you should use method chaining with one or more invocations of thenComparing(). The following code shows how you can sort beers by name and price:

System.out.println("=== Sorting by name and price");

beers.sort(Comparator.comparing((Beer beer) -> beer.name)

.thenComparing(beer -> beer.price));

beers.forEach(System.out::println);

The method comparing() expects to get a method extractor as an argument. The method extractor (a getter) returns a field that should be used for comparison. The preceding code snippet uses lambda expressions instead of method extractors, which requires you to specify the type in case of method chaining. That’s why this example uses explicit casting (Beer beer).

If I had getters in the Beer class, I could have used method references and casting wouldn’t be required:

beers.sort(Comparator.comparing(Beer::getName)

.thenComparing(Beer::getPrice));

Mutable Collections

Using Collections.sort() with both Comparable and Comparator interfaces modifies (re-orders) the original data collection. Hence a collection is mutable. This is not the case when sorting streams, which is explained next.

Sorting Streams

Now that you are familiar with the basics of general sorting of data collections, it’s time to see how you can use Stream API for sorting any data sources. As a reminder, when you work with streams, the original data source stays immutable—no changes are made to the data source. To store the sorted data in another collection or an array you need to use the terminal operation collect().

The method sorted works together with Comparator, and to sort your beer collection by price just write something like this:

beers.stream()

.sorted(Comparator.comparing(b -> b.price))

.forEach(System.out::println);

Sorting by multiple fields is done similarly to the code sample from the section on Comparator. The next example shows you how to sort beers by country and price. Slightly modify the method toString() from Beer to print the country too:

public String toString(){

return "Country: " + country +

" Name: " + name + ", price: " + price;

}

This is how sorting by beer country and price can look like:

beers.stream()

.sorted(Comparator.comparing((Beer b) -> b.country)

.thenComparing(b -> b.price))

.forEach(System.out::println);

Running this code snippet produces the following output:

Country: Belgium Name: Stella, price: 7.75

Country: Belgium Name: Leffe Blonde, price: 8.75

Country: Belgium Name: Chimay Blue, price: 10.0

Country: USA Name: Bud Light, price: 5.0

Country: USA Name: Yuengling, price: 5.5

Country: USA Name: Sam Adams, price: 7.0

Country: USA Name: Brooklyn Lager, price: 8.25

Country: Ukraine Name: Obolon, price: 4.0

To store the result of the stream sorting in a new List collection, you need to add a terminal operation:

List<Beer> sortedBeers = beers.stream()

.sorted(Comparator.comparing(b -> b.price))

.collect(Collectors.toList());

Now you have two collections. The original (beers) collection is unsorted and the new one ( sortedBeers ) is sorted.

Parallel Streams and Sorting

If you decide to use parallelStream(), the method forEach() can’t be used with the sorted data. Per Oracle documentation, “For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism." You need to use the forEachOrdered() method instead.

Other Stream Sources

So far, all the code samples in this chapter use a collection of objects as a stream’s data source. But this doesn’t have to be the case. You can process both finite and infinite data sources with the Stream API.

Creating Finite Size Streams

You can take a bunch of arbitrary values and turn them into a stream using the Stream.of() method. For example, the following code snippet creates and prints a stream of strings of a finite size:

Stream<String> beerNames = Stream.of("Leffe Blonde",

"Chimay Blue","Sam Adams");

beerNames.forEach(System.out::println);

The method builder() is yet another way of creating finite size streams. The following code snippet creates a stream of three long primitives and finds the maximum value. Because max() returns the OptionalLong type (it may or may not have a value), I call the getAsLong() to get the primitive long. After running this code, maxValue is equal to 21.

long maxValue = LongStream.builder()

.add(10)

.add(21)

.add(15)

.build()

.max().getAsLong();

Creating Infinite-Size Streams

Although data collections have finite size, you can use the Stream API for working with infinite streams of data.

Generating Stream Data

The method Stream.generate() can take a lambda expression that generates values by some arbitrary algorithm. This lambda has to be an implementation of the functional interface java.util.function.Supplier . Implementing a Supplier comes down to writing a function that returns some result.

The following class StreamOfDates uses the class LocalDateTime, which is a part of the new Java 8 Date and Time API located in the package java.time. The supplier repeatedly sleeps for a second (1000 millisec) and then queries the system time using the method now(). The method Stream.generate() generates an infinite stream that is feeding the stream with the current time about every second.

import java.time.LocalDateTime;

import java.util.function.Supplier;

import java.util.stream.Stream;

public class StreamOfDates {

public static void main(String[] args){

// Implementing a Supplier interface

Supplier<LocalDateTime> myStopWatch = () -> {

try{

Thread.sleep(1000);

} catch (InterruptedException e){

e.printStackTrace();

}

return LocalDateTime.now(); // get the current time

};

// Generating a stream using lambda expression

Stream<LocalDateTime> timeStream =

Stream.generate(myStopWatch);

timeStream.forEach(System.out::println);

}

}

Running this program starts producing an output that never stops and looks like this (without ellipses):

2014-09-18T17:41:36.017

2014-09-18T17:41:37.026

2014-09-18T17:41:38.027

2014-09-18T17:41:39.028

2014-09-18T17:41:40.028

2014-09-18T17:41:41.029

2014-09-18T17:41:42.029

2014-09-18T17:41:43.030

2014-09-18T17:41:44.031

2014-09-18T17:41:45.033

...

You see how to stop an infinite stream in the section Short-Circuit Operations.

Yet another way of generating the infinite stream is the method iterate(), which requires a rule for generating the next data value.

LongStream evenNumbers = LongStream.iterate(0, num -> num+2);

evenNumbers.forEach(System.out::println);

The preceding code prints even numbers, but because they are being generated extremely fast, you might need to limit the number of generated values (see the section Short-Circuit Operations) to see the expected results.

Using Stream API with I/O Streams

As of Java 8, some of the classes used for processing I/O streams include new methods that allow data processing with the Stream API. For example, the class java.io.BufferedReader has a new method lines() that returns a Stream, the elements of which are lines read from this BufferedReaderobject. As with other data sources, the Stream is lazily populated—that is, read only occurs during the terminal operation.

You can see it in action by rewriting the class WebSiteReader from Lesson 16. That class was reading and printing the content of the web page www.google.com. The new version of this class is called WebSiteReaderStream.

public class WebSiteReaderStream {

public static void main(String args[]){

String nextLine;

URL url = null;

URLConnection urlConn = null;

try

{

// Assume index.html is a default home page name

url = new URL("http://www.google.com" );

urlConn = url.openConnection();

} catch( IOException e){

System.out.println(

"Can't connect to the provided URL:" +

e.toString() );

}

try( InputStreamReader inStream = new InputStreamReader(

urlConn.getInputStream(), "UTF8");

BufferedReader buff =

new BufferedReader(inStream);){

// Read and print the content of the Google home page

// using Stream API

buff.lines()

.forEach(System.out::println);

} catch(IOException e1){

System.out.println("Can't read from the Internet: "+

e1.toString() );

}

}

}

Not only does the reading part of the stream becomes simpler—just call the lines()—but you can now add some intermediate operations to perform some filtering as the data is coming in. For example, you can create a matching pattern using regular expressions (not covered in this book) and read only those data that match this pattern. You can research this further by finding examples that use the Java class Matcher.

Short-Circuit Operations

In some cases you want to stop stream processing prematurely. Say you want to show only the first five elements from the stream. Short-circuit operations serve this purpose. Revisit the example that prints even numbers. With it you generated an infinite stream of even numbers. To print only the first five numbers you use the short-circuit operation limit():

LongStream evenNumbers = LongStream

.iterate(0, num -> num+2)

.limit(5);

evenNumbers.forEach(System.out::println);

This code prints the following five numbers:

0

2

4

6

8

Another short-circuit method is the findFirst() element of the stream. This method returns an object of type Optional, which was introduced in Java 8. It allows avoiding NullPointerException if the requested object is not found. If the requested value is not found, the method findFirst()returns an empty Optional object. The next code sample prints the first element from the beers collection:

Optional<Beer> firstBeer = beers.stream()

.findFirst();

System.out.println("The first beer in collection: " +

firstBeer.orElse(new Beer("No name","No country",0 )));

Running this code against your collection of beers prints the following:

The first beer in collection: Name: Stella, price: 7.75

If your collection is empty, the println() method uses the code from the orElse() method. An attempt to simply print firstBeer from an empty collection would output Optional.empty.

Some other short-circuit methods on the class Stream are skip(), findAny(), allMatch(), noneMatch(), and anyMatch().

Try It

In the following assignments you need to use method references while working with streams. You also try using short-circuit operations.

Lesson Requirements

You should have Java installed.

NOTE You can download the code and resources for this “Try It” from the book’s web page at www.wrox.com/go/javaprog24hr2e. You can find them in the Lesson20.zip.

Step-by-Step

In this exercise you need to use static method references with streams.

1. Import the project Lesson20 into the Eclipse IDE.

2. In the class StreamOfDates, add a static method titled myStopWatchFunction() and move the code from the lambda expression myStopWatch there. This is how the lambda expression myStopWatch was originally implemented:

3. Supplier<LocalDateTime> myStopWatch = () -> {

4. try{

5. Thread.sleep(1000);

6. } catch (InterruptedException e){

7. e.printStackTrace();

8. }

9. return LocalDateTime.now(); // get the current time

};

10.Remove the declaration of the lambda expression Supplier myStopWatch from the class.

11.The existing invocation of the method generate() looks like this:

Stream.generate(myStopWatch)

Replace the argument of the method generate() with the method reference StreamOfDates::myStopWatchFunction.

12.Run the program. It should start printing the infinite messages with the current time.

13.Add the short-circuit operation limit() so the program stops after printing the current time five times.

14.Are limit() and findFirst() intermediate or terminal operations?

15.Modify the class Beer to include getters and change the Comparator code sample that use to use method references instead of lambda expressions.

TIP Please select the videos for Lesson 20 online at www.wrox.com/go/javaprog24hr2e. You will also be able to download the code and resources for this lesson from the website.