Streams: Node’s most powerful and misunderstood feature - Node fundamentals - Node.js in Practice (2015)

Node.js in Practice (2015)

Part 1. Node fundamentals

Chapter 5. Streams: Node’s most powerful and misunderstood feature

This chapter covers

· What streams are and how to use them

· How to use Node’s built-in streaming APIs

· The stream API used in Node 0.8 and below

· The stream primitive classes bundled since Node 0.10

· Strategies for testing streams

Streams are an event-based API for managing and modeling data, and are wonderfully efficient. By leveraging EventEmitter and Node’s non-blocking I/O libraries, the stream module allows data to be dynamically processed when it’s available, and then released when it’s no longer needed.

The idea of a stream of data isn’t new, but it’s an important concept and integral to Node. After chapter 4, mastering streams is the next step on the path to becoming truly competent at Node development.

The stream core module provides abstract tools for building event-based stream classes. It’s likely that you’ll use modules that implement streams, rather than creating your own. But to exploit streams to their fullest, it’s important to understand how they really work. This chapter has been designed with that goal in mind: understanding streams, working with Node’s built-in streaming APIs, and finally creating and testing your own streams. Despite the conceptually abstract nature of the stream module, once you’ve mastered the major concepts, you’ll start to see uses for streams everywhere.

The next section provides a high-level overview of streams and addresses the two APIs that Node supports as of Node 0.10.

5.1. Introduction to streams

In Node, streams are an abstract interface adhered to by several different objects. When we talk about streams, we’re referring to a way of doing things—in a sense, they’re a protocol. Streams can be readable or writable, and are implemented with instances of EventEmitter—see chapter 4 for more on events. Streams provide the means for creating data flows between objects, and can be composed with LEGO-like modularity.

5.1.1. Types of streams

Streams always involve I/O of some kind, and they can be classified into groups based on the type of I/O they deal with. The following types of streams were taken from James Halliday’s stream-handbook (https://github.com/substack/stream-handbook/), and will give you an idea of the wide array of things you can do with streams:

· Built-in —Many of Node’s core modules implement streaming interfaces; for example, fs.createReadStream.

· HTTP —Although technically network streams, there are streaming modules designed to work with various web technologies.

· Parsers —Historically parsers have been implemented using streams. Popular third-party modules for Node include XML and JSON parsers.

· Browser —Node’s event-based streams have been extended to work in browsers, offering some unique opportunities for interfacing with client-side code.

· Audio —James Halliday has written some novel audio modules that have streamable interfaces.

· RPC (Remote Procedure Call) —Sending streams over the network is a useful way to implement interprocess communication.

· Test —There are stream-friendly test libraries, and tools for testing streams themselves.

· Control, meta, and state —There are also more abstract uses of streams, and modules designed purely for manipulating and managing other streams.

The best way to understand why streams are important is to first consider what happens when data is processed without them. Let’s look at this in more detail by comparing Node’s asynchronous, synchronous, and stream-based APIs.

5.1.2. When to use streams

When reading a file synchronously with fs.readFileSync, the program will block, and all of the data will be read to memory. Using fs.readFile will prevent the program from blocking because it’s an asynchronous method, but it’ll still read the entire file into memory.

What if there were a way to tell fs.readFile to read a chunk of data into memory, process it, and then ask for more data? That’s where streams come in.

Memory becomes an issue when working with large files—compressed backup archives, media files, large log files, and so on. Instead of reading the entire file into memory, you could use fs.read with a suitable buffer, reading in a specific length at a time. Or, preferably, you could use the streams API provided by fs.createReadStream. Figure 5.1 illustrates how only a chunk of a file is read at a time with fs.createReadStream, compared to the entire file with fs.readFile.

Figure 5.1. Using streamable APIs means I/O operations potentially use less memory.

Streams are asynchronous by design. Rather than reading that entire file into memory, a buffer’s worth will be read, the desired operations will be performed, and then the result will be written to the output stream. This approach is as close to idiomatic Node as you can get. What’s more, streams are implemented with plain old JavaScript. Take fs.createReadStream—it offers a more scalable solution, but ultimately just wraps simple file system operations with a better API.

Node’s streaming APIs feel idiomatic, yet streams have been around in computer science for a long time. This history is examined briefly in the next section to give you some background on where streams come from and where they’re used.

5.1.3. History

So where did streams originate? Historically, streams in computer science have been used to solve problems similar to streams in Node. For example, in C the standard way to represent a file is by using a stream. When a C program starts, it has access to the standard I/O streams. The standard I/O streams are also available in Node, and can be used to allow programs to work well with large amounts of data in the shell.

Traditionally, streams have been used to implement efficient parsers. This has also been the case in Node: the node-formidable module (https://github.com/felixge/node-formidable) is used by Connect to efficiently parse form data with streams, and database modules like the Node redismodule (https://npmjs.org/package/redis) use streams to represent the connection to the server and respond by parsing on demand.

If you’re familiar with Unix, you’re probably already aware of streams. If you’ve used pipes or I/O redirection, then you’ve used streams. You can literally think about Node streams as you would Unix pipes—except data is filtered through functions instead of command-line programs. The next section explains how streams have evolved in Node, up until version 0.10 when they changed significantly.

Streams old and new

Streams are part of Node’s core modules, and as such remain backward compatible with earlier versions. As of this writing, Node is at version 0.10, which has seen significant changes in the streams API. Though it remains backward compatible, the new streams syntax is in some ways stricter than earlier versions, yet ultimately more flexible. This boils down to the behavior of pipe—pipes must now originate from a Readable stream and end at a Writable stream. The util.pump method, found in earlier versions of Node, has now been deprecated in favor of the newpipe semantics.

The evolution of streams in Node came from a desire to use the event-based APIs to solve non-blocking I/O problems in an efficient way. Older solutions like util.pump sought to find efficiency in intelligent uses of “drain” events—this is emitted when a writable stream has emptied and it’s safe to write again. This sounds a lot like pausing a stream, and the handling of paused streams was something the pre-0.10 streams API couldn’t handle effectively.

Now Node has reached a point where the core developers have seen the types of problems people are tackling with streams, so the new API is richer thanks to the new stream primitive classes. Table 5.1 shows a summary of the classes available from Node 0.10 onward.

Table 5.1. A summary of the classes available in streams2

Name

User methods

Description

stream.Readable

_read(size)

Used for I/O sources that generate data

stream.Writable

_write(chunk, encoding, callback)

Used to write to an underlying output destination

stream.Duplex

_read(size), _write(chunk, encoding, callback)

A readable and writable stream, like a network connection

stream.Transform

_flush(size), _transform(chunk, encoding, callback)

A duplex stream that changes data in some way, with no limitation on matching input data size with the output

Learning to take advantage of streams will pay dividends when it comes to working with third-party modules that implement streams. In the next section, a selection of popular stream-oriented modules is examined.

5.1.4. Streams in third-party modules

The main use of streams in Node is for creating event-based APIs for I/O-like sources; parsers, network protocols, and database modules are the key examples. A network protocol implemented with streams can be convenient when composition is desired—think about how easy it would be to add data compression to a network protocol if the data could be passed through the gzip module with a single call to pipe.

Similarly, database libraries that stream data can handle large result sets more efficiently; rather than collecting all results into an array, a single item at a time can be streamed.

The Mongoose MongoDB module (http://mongoosejs.com/) has an object called QueryStream that can be used to stream documents. The mysql module (https://npmjs.org/package/mysql) can also stream query results, although this implementation doesn’t currently implement thestream.Readable class.

You can also find more creative uses of streams out there. The baudio module (see figure 5.2) by James Halliday can be used to generate audio streams that behave just like any other stream—audio data can be routed to other streams with pipe, and recorded for playback by standard audio software:

Figure 5.2. The baudio module by James Halliday (substack) supports the generation of audio streams (from https://github.com/substack/baudio).

var baudio = require('baudio');

var n = 0;

var b = baudio(function (t) {

var x = Math.sin(t * 262 + Math.sin(n));

n += Math.sin(t);

return x;

});

b.play();

When selecting a network or database library for your Node projects, we strongly recommend ensuring it has a streamable API, because it’ll help you write more elegant code while also potentially offering performance benefits.

One thing all stream classes have in common is they inherit from EventEmitter. The significance of this is investigated in the next section.

5.1.5. Streams inherit from EventEmitter

Each of the stream module base classes emits various events, which depend on whether the base class is readable, writable, or both. The fact that streams inherit from EventEmitter means you can bind to various standard events to manage streams, or create your own custom events to represent more domain-specific behavior.

When working with stream.Readable instances (see table 5.2 for guidance on selecting a stream base class), the readable event is important because it signifies that the stream is ready for calls to stream.read().

Table 5.2. Selecting a streams base class

Problem

Solution

You want to wrap around an underlying I/O source with a streamable API.

Readable

You want to get output from a program to use elsewhere, or send data elsewhere within a program.

Writable

You want to change data in some way by parsing it.

Transform

You want to wrap a data source that can also receive messages.

Duplex

You want to extract data from streams without changing it, from testing to analysis.

PassThrough

Attaching a listener to data will cause the stream to behave like the old streams API, where data is passed to data listeners when it’s available, rather than through calls to stream.read().

The error event is covered in detail in technique 28. It’ll be emitted if the stream encounters an error when receiving data.

The end event signifies that the stream has received an equivalent of the end-of-file character, and won’t receive more data. There’s also a close event that represents the case where the underlying resource has been closed, which is distinct from end, and the Node API documentation notes that not all streams will emit this event, so a rule of thumb is to bind to end.

The stream.Writable class changes the semantics for signifying the end of a stream to close and finish. The distinction between the two is that finish is emitted when writable.end() is called, whereas close means the underlying I/O resource has been closed, which isn’t always required, depending on the nature of the underlying stream.

The pipe and unpipe events are emitted when passing a stream to the stream.Readable.prototype.pipe method. This can be used to adapt the way a stream behaves when it’s piped. The listener receives the destination stream as the first argument, so this value could be inspected to change the behavior of the stream. This is a more advanced technique that’s covered in technique 37.

About the techniques in this chapter

The techniques in this chapter all use the streams2 API. This is the nickname of the newer API style found in Node 0.10 and 0.12. If you’re using Node 0.8, forward compatibility is supported through the readable-stream module (https://github.com/isaacs/readable-stream).

In the next section you’ll learn how to solve real-world problems using streams. First we’ll discuss some of Node’s built-in streams, and then we’ll move on to creating entirely new streams and testing them.

5.2. Built-in streams

Node’s core modules themselves are implemented using the stream module, so it’s easy to start using streams without having to build your own classes. The next technique introduces some of this functionality through file system and network streaming APIs.

Technique 27 Using built-in streams to make a static web server

Node’s core modules often have streamable interfaces. They can be used to solve many problems more efficiently than their synchronous alternatives.

Problem

You want to send a file from a web server to a client in an efficient manner that will scale up to large files.

Solution

Use fs.createReadStream to open a file and stream it to the client. Optionally, pipe the resulting stream.Readable through another stream to handle features like compression.

Discussion

Node’s core modules for file system and network operations, fs and net, both provide streamable interfaces. The fs module has helper methods to automatically create instances of the streamable classes. This makes using streams for some I/O-based problems fairly straightforward.

To understand why streams are important and compare them to nonstreaming code, consider the following example of a simple static file web server made with Node’s core modules:

var http = require('http');

var fs = require('fs');

http.createServer(function(req, res) {

fs.readFile(__dirname + '/index.html', function(err, data) { //

if (err) {

res.statusCode = 500;

res.end(String(err));

} else {

res.end(data);

}

});

}).listen(8000);

Even though this code uses the fs.readFile method, which is non-blocking, it can easily be improved on by using fs.createReadStream. The reason is because it’ll read the entire file into memory. This might seem acceptable with small files, but what if you don’t know how large the file is? Static web servers often have to serve up potentially large binary assets, so a more adaptable solution is desirable.

The following listing demonstrates a streaming static web server.

Listing 5.1. A simple static web server that uses streams

This example uses less code than the first version, and improves its efficiency. Now instead of reading the entire file into memory, a buffer’s worth will be read at a time and sent to the client. If the client is on a slow connection, the network stream will signal this by requesting that the I/O source pauses until the client is ready for more data. This is known as backpressure, and is one of the additional benefits using streams brings to your Node programs.

We can take this example a step further. Streams aren’t just efficient and potentially more syntactically elegant, they’re also extensible. Static web servers often compress files with gzip. The next listing adds that to the previous example, using streams.

Listing 5.2. A static web server with gzip

Now if you open http://localhost:8000 in a browser and use its debugging tools to look at the network operations, you should see that the content was transferred using gzip. Figure 5.3 shows what our browser reported after running the example.

Figure 5.3. The network inspector confirms the content was compressed.

This could be expanded in several other ways—you can use as many calls to pipe as required. For example, the file could be piped through an HTML templating engine and then compressed. Just remember that the general pattern is readable.pipe(writable).

Note that this example is simplified to illustrate how streams work and isn’t sufficient for implementing a production HTTP asset server.

Now that you’ve seen a fleshed-out example of how streams are used to solve a common problem, it’s time to look at another piece of the puzzle: error handling.

Technique 28 Stream error handling

The stream classes inherit from EventEmitter, which means sane error handling comes as standard. This technique explains how to handle errors generated by a stream.

Problem

You want to catch errors generated by a stream.

Solution

Add an error listener.

Discussion

The standard behavior of EventEmitter is to throw an exception when an error event is emitted—unless there’s a listener attached to the error event. The first argument to the listener will be the error that was raised, a descendent of the Error object.

The following listing shows an example of an intentionally generated error with a suitable error listener.

Listing 5.3. Catching errors during streaming

Here we attempt to open a file that doesn’t exist , causing an 'error' event to be triggered. The error object passed to the handler will usually have extra information to aid in tracking down the error. For example, the stack property may have line number information, andconsole.trace() can be called to generate a full stack trace. In listing 5.3 console.trace() will show a trace up to the ReadStream implementation in Node’s events.js core module. That means you can see exactly where the error was originally emitted.

Now that you’ve seen how some of Node’s core modules use streams, the next section explores how third-party modules use them.

5.3. Third-party modules and streams

Streams are about as idiomatic Node as you can get, so it’s no surprise that streamable interfaces crop up all over the open source Node landscape. In the next technique you’ll learn how to use streamable interfaces found in some popular Node modules.

Technique 29 Using streams from third-party modules

Many open source developers have recognized the importance of streams and incorporated streamable interfaces into their modules. In this technique you’ll learn how to identify such implementations and use them to solve problems more efficiently.

Problem

You want to know how to use streams with a popular third-party module that you’ve downloaded with npm.

Solution

Look at the module’s documentation or source code to figure out if it implements a streamable API, and if so, how to use it.

Discussion

We’ve picked three popular modules as examples of third-party modules that implement streamable interfaces. This guided tour of streams in the wild should give you a good idea of how developers are using streams, and how you can exploit streams in your own projects.

In the next section you’ll discover some key ways to use streams with the popular web framework, Express.

Using streams with Express

The Express web framework (http://expressjs.com/) actually provides a relatively lightweight wrapper around Node’s core HTTP module. This includes the Request and Response objects. Express decorates these objects with some of its own methods and values, but the underlying objects are the same. That means everything you learned about streaming data to browsers in technique 27 can be reused here.

A simple example of an Express route—a callback that runs for a given HTTP method and URL—uses res.send to respond with some data:

var express = require('express');

var app = express();

app.get('/', function(req, res) {

res.send('hello world');

});

app.listen(3000);

The res object is actually a response object, and inherits from Node’s http.Server-Response. In technique 27 you saw that HTTP requests can be streamed to by using the pipe method. Express is built in a way that allows buffers and objects to work with the res.send method, and for streams you can still use the pipe method.

Listing 5.4 is an Express web application that will run with Express 3 and streams content from a custom-readable stream by using pipe.

Listing 5.4. An Express application that uses streams

Our custom readable stream, StatStream, inherits from stream.Readable and implements the _read method, which just sends memory usage data . The _read method must be implemented whenever you want to make a readable stream. When sending the response back to the browser, the stream can be piped to the res object provided by Express without any extra work.

The implementation of the send module that comes with Express 3 uses fs.createReadStream, as described in technique 27. The following sample code is taken from the source to send:

SendStream.prototype.stream = function(path, options){

TODO: this is all lame, refactor meeee

var self = this;

var res = this.res;

var req = this.req;

pipe

var stream = fs.createReadStream(path, options);

this.emit('stream', stream);

stream.pipe(res);

It takes a lot more work to correctly deal with things like HTTP Content-Range headers, but this snippet demonstrates that leveraging the built-in streaming APIs like fs.createReadStream can lead to solutions powerful enough to underpin major open source projects.

Using streams with Mongoose

The Mongoose module (http://mongoosejs.com/) for the MongoDB database server (http://www.mongodb.org/) has an interface called QueryStream that provides Node 0.8-style streams for query results. This class is used internally to allow query results to be streamed using the streammethod. The following code shows a query that has its results piped through a hypothetical writable stream:

User

.where('role')

.equals('admin')

.stream()

.pipe(writeStream);

This pattern—using a class to wrap an external I/O source’s streamable behavior, and then exposing streams through simple method calls—is the style employed by Node’s core modules, and is popular with third-party module authors. This has been made clearer by the streams2 API’s use of simple abstract classes that can be inherited from.

Using streams with MySQL

The third-party mysql module (https://npmjs.org/package/mysql) is often seen by Node developers as something low-level that should be built on with more complex libraries, like the Sequelize (http://www.sequelizejs.com/) object-relational mapper (ORM). But the mysql module itself shouldn’t be underestimated, and supports streaming results with pause and resume. Here’s an example of the basic API style:

var query = connection.query('SELECT * FROM posts');

query

.on('result', function(row) {

connection.pause();

processRow(row, function() {

connection.resume();

});

});

This streaming API uses domain-specific event names—there’s also a 'fields' event. To pause the result stream, connection.pause must be called. This signals to the underlying connection to MySQL that results should stop briefly until the receiver is ready for more data.

Summary

In this technique you’ve seen how some popular third-party modules use streams. They’re all characterized by the fact they deal with I/O—both HTTP and database connections are network- or file-based protocols, and both can involve network connections and file system operations. In general, it’s a good idea to look for Node network and database modules that implement streamable interfaces, because they help scale programs and also write them in a readable, idiomatic style.

Now that you’ve seen how to use streams, you’re probably itching to learn how to create your own. The next section has a technique for using each base stream class, and also shows how to correctly inherit from them.

5.4. Using the stream base classes

Node’s base stream classes provide templates for solving the kinds of problems that streams are best at. For example, stream.Transform is great for parsing data, and stream.Readable is perfect for wrapping lower-level APIs with a streamable interface.

The next technique explains how to inherit from the stream base classes, and then further techniques go into detail about how to use each base class.

Technique 30 Correctly inheriting from the stream base classes

Node’s base classes for streams can be used as a starting point for new modules and subclasses. It’s important to understand what problems each solves, and how to correctly inherit from them.

Problem

You want to solve a problem by creating a streamable API, but you’re not sure which base class to use and how to use it.

Solution

Decide on which base class closely matches the problem at hand, and inherit from it using Object.prototype.call and util.inherits.

Discussion

Node’s base classes for streams, already summarized in table 5.1, should be used as the basis for your own streamable classes or modules. They’re abstract classes, which means they’re methods that you must implement before they can be used. This is usually done through inheritance.

All of the stream base classes are found in the stream core module. The five base classes are Readable, Writable, Duplex, Transform, and PassThrough. Fundamentally, streams are either readable or writable, but Duplex streams are both. This makes sense if you consider the behavior of I/O interfaces—a network connection can be both readable and writable. It wouldn’t be particularly useful, for example, if ssh were only able to send data.

Transform streams build on Duplex streams, but also change the data in some way. Some of Node’s built-in modules use Transform streams, so they’re fundamentally important. An example of this is the crypto module.

Table 5.2 offers some hints to help you choose which base class to use.

Inheriting from the base classes

If you’ve learned about inheritance in JavaScript, you might be tempted to inherit from the stream base classes by using MyStream.prototype = new stream.Readable();. This is considered bad practice, and it’s better to use the ECMAScript 5 Object.create pattern instead. Also, the base class’s constructor must be run, because it provides essential setup code. The pattern for this is shown next.

Listing 5.5. Inheriting from the stream.Readable base class

Node includes a utility method called util.inherits that can be used instead of Object.create, but both approaches are widely used by Node developers. This example uses the Object.create method instead so you can see what util.inherits does.

Note that in listing 5.5 the options argument is passed to the original Readable constructor. This is important because there’s a standard set of options that Node supports for configuring streams. In the case of Readable, the options are as follows:

· highWaterMark —The number of bytes to store in the internal buffer before pausing reading from the underlying data source.

· encoding —Causes the buffer to be automatically decoded. Possible values include utf8 and ascii.

· objectMode —Allows the stream to behave as a stream of objects, rather than bytes.

The objectMode option allows JavaScript objects to be handled by streams. An example of this has been provided in technique 31.

Summary

In this technique you’ve seen how to use Node’s stream base classes to create your own stream implementations. This involves using util.inherits to set up the class, and then .call to call the original constructor. We also covered some of the options that these base classes use.

Properly inheriting from the base classes is one thing, but what about actually implementing a stream class? Technique 31 explains this in more detail for the Readable base class, but in that specific case it involves implementing a method called _read to read data from the underlying data source and push it onto an internal queue managed by the base class itself.

Technique 31 Implementing a readable stream

Readable streams can be used to provide a flexible API around I/O sources, and can also act as parsers.

Problem

You’d like to wrap an I/O source with a streamable API that provides a higher-level interface than would otherwise be possible with the underlying data.

Solution

Implement a readable stream by inheriting from the stream.Readable class and creating a _read(size) method.

Discussion

Implementing a custom stream.Readable class can be useful when a higher level of abstraction around an underlying data source is required. For example, I (Alex) was working on a project where the client had sent in JSON files that contained millions of records separated by newlines. I decided to write a quick stream.Readable class that read a buffer’s worth of data, and whenever a newline was encountered, JSON.parse was used to parse the record.

One way of using stream.Readable to parse newline-separated JSON records is shown next.

Listing 5.6. A JSON line parser

Listing 5.6 uses a constructor function, JSONLineReader , that inherits from stream.Readable to read and parse lines of JSON from a file. The source for JSONLineReader will be a readable stream as well, so a listener for the readable event is bound to, so instances ofJSONLineReader know when to start reading data .

The _read method checks whether the buffer is empty and, if so, reads more data from the source and adds it to the internal buffer. Then the current line index is incremented, and if a line ending is found, the first line is sliced from the buffer . Once a complete line has been found, it’s parsed and emitted using the object event —users of the class can bind to this event to receive each line of JSON that’s found in the source stream.

When this example is run, data from a file will flow through an instance of the class. Internally, data will be queued. Whenever source.read is executed, the latest “chunk” of data will be returned, so it can be processed when JSONLineReader is ready for it. Once enough data has been read and a newline has been found, the data will be split up to the first newline, and then the result will be collected by calling this.push .

Once this.push is called, stream.Readable will queue the result and forward it on to a consuming stream. This allows the stream to be further processed by a writable stream using pipe. In this example JSON objects are emitted using a custom object event. The last few lines of this example attach an event listener for this event and process the results .

The size argument to Readable.prototype._read is advisory. That means the underlying implementation can use it to know how much data to fetch—this isn’t always needed so you don’t always implement it. In the previous example we parsed the entire line, but some data formats could be parsed in chunks, in which case the size argument would be useful.

In the original code that I based this example on, I used the resulting JSON objects to populate a database. The data was also redirected and gzipped into another file. Using streams made this both easy to write and easy to read in the final project.

The example in listing 5.6 used strings, but what about objects? Most streams that deal directly with I/O—files, network protocols, and so on—will use raw bytes or strings of characters. But sometimes it’s useful to create streams of JavaScript objects. Listing 5.7 shows how to safely inherit from stream.Readable and pass the objectMode option to set up a stream that deals with JavaScript objects.

Listing 5.7. A stream configured to use objectMode

The MemoryStream example in listing 5.7 uses objects for data, so objectMode is passed to the Readable constructor as an option . Then process.memoryUsage is used to generate some suitable data . When an instance of this class emits readable , indicating that it’s ready to be read from, then the memory usage data is logged to the console.

When using objectMode, the underlying behavior of the stream is changed to remove the internal buffer merge and length checks, and to ignore the size argument when reading and writing.

Technique 32 Implementing a writable stream

Writable streams can be used to output data to underlying I/O sinks.

Problem

You want to output data from a program using an I/O destination that you want to wrap with a streamable interface.

Solution

Inherit from stream.Writable and implement a _write method to send data to the underlying resource.

Discussion

As you saw in technique 29, many third-party modules offer streamable interfaces for network services and databases. Following this trend is advantageous because it allows your classes to be used with the pipe API, which helps keep chunks of code reusable and decoupled.

You might be simply looking to implement a writable stream to act as the destination of a pipe chain, or to implement an unsupported I/O resource. In general, all you need to do is correctly inherit from stream.Writable—for more on the recommended way to do this, see technique 30—and then add a _write method.

All the _write method needs to do is call a supplied callback when the data has been written. The following code shows the method’s arguments and the overall structure of a sample _write implementation:

A _write method supplies a callback that you can call when writing has finished. This allows _write to be asynchronous. This customWriteOperation method is simply used as an example here—in a real implementation it would perform the underlying I/O. This could involve talking to a database using sockets, or writing to a file. The first argument provided to the callback should be an error , allowing _write to propagate errors if needed.

Node’s stream.Writable base class doesn’t need to know how the data was written, it just cares whether the operation succeeded or failed. Failures can be reported by passing an Error object to callback. This will cause an error event to be emitted. Remember that these streambase classes descend from EventEmitter, so you should usually add a listener to error to catch and gracefully handle any errors.

The next listing shows a complete implementation of a stream.Writable class.

Listing 5.8. An example implementation of a writable stream

This short example changes input text into green text. It can be used by running it with node writable.js, or by piping text through it with cat file.txt | node writable.js.

Although this is a trivial example, it illustrates how easy it is to implement streamable classes, so you should consider doing this the next time you want to make something that stores data work with pipe.

Chunks and encodings

The encoding argument to write is only relevant when strings are being used instead of buffers. Strings can be used by setting decodeStrings to false in the options that are passed when instantiating a writable stream.

Streams don’t always deal with Buffer objects because some implementations have optimized handling for strings, so dealing directly with strings can be more efficient in certain cases.

Technique 33 Transmitting and receiving data with duplex streams

Duplex streams allow data to be transmitted and received. This technique shows you how to create your own duplex streams.

Problem

You want to create a streamable interface to an I/O source that needs to be both readable and writable.

Solution

Inherit from stream.Duplex and implement _read and _write methods.

Discussion

Duplex streams are a combination of the Writable and Readable streams, which are explained in techniques 31 and 32. As such, Duplex streams require inheriting from stream.Duplex and implementations for the _read and _write methods. Refer to technique 30 for an explanation of how to inherit from the stream base classes.

Listing 5.9 shows a small stream.Duplex class that reads and writes data from stdin and stdout. It prompts for data and then writes it back out with ANSI escape codes for colors.

Listing 5.9. A duplex stream

The HungryStream class in listing 5.9 will display a prompt, wait for input, and then return the input with ANSI color codes. To track the state of the prompt, an internal property called waiting is used. The _write method, which will be called by Node automatically, sets thewaiting property to false, indicating that input has been received, and then the data is pushed to the internal buffer with color codes attached. Finally, the callback that gets automatically passed to _write is executed .

When the class is waiting for data, the _read method pushes a message that acts as the prompt . This can be made interactive by piping the standard input stream through an instance of HungryStream and then back out through the standard output stream .

The great thing about duplex streams is they can sit in the middle of pipes. A simpler way to do this is to use the stream.PassThrough base class, which only relays data, allowing you to plug into the middle of a pipe and track data as it flows through it. The diagram in figure 5.4 shows how chunks of data flow through the duplex stream object, from the input to the output stream.

Figure 5.4. A duplex stream

Several stream.Duplex implementations in the wild implement a _write method but keep the _read method as a blank stub. This is purely to take advantage of duplex streams as something that can enhance the behavior of other streams through pipes. For example, hiccup by Naomi Kyoto (https://github.com/naomik/hiccup) can be used to simulate slow or sporadic behavior of underlying I/O sources. This novel use of streams comes in handy when you’re writing automated tests.

Duplex streams are useful for piping readable streams to writable streams and analyzing the data. Transform streams are specifically designed for changing data; the next technique introduces stream.Transform and the _transform method.

Technique 34 Parsing data with transform streams

Streams have long been used as a way to create efficient parsers. The stream.Transform base class can be used to do this in Node.

Problem

You want to use streams to change data into another format in a memory-efficient manner.

Solution

Inherit from stream.Transform and implement the _transform method.

Discussion

On the surface, transform streams sound a little bit like duplex streams. They can also sit in the middle of a pipe chain. The difference is that they’re expected to transform data, and they’re implemented by writing a _transform method. This method’s signature is similar to _write—it takes three arguments, chunk, encoding, and callback. The callback should be executed when the data has been transformed, which allows transform streams to parse data asynchronously.

Listing 5.10 shows a transform stream that parses (albeit simplified) CSV data. The CSV is expected to contain comma-separated values without extra spaces or quotes, and should use Unix line endings.

Listing 5.10. A CSV parser implemented using a transform stream

Parsing CSV involves tracking several variables—the current value, the headers for the file, and the current line number . To do this, a stream.Transform descendent with suitable properties can be used. The _transform implementation is the most complex part of this example. It receives a chunk of data, which is iterated over one character at a time using a for loop . If the character is a comma, the current value is saved (if there is one). If the current character is a newline, the line is transformed into a JSON representation . This example is synchronous, so it’s safe to execute the callback supplied to _transform at the end of the method . A toObject method has been included to make it easier to change the internal representation of the headers and values into a JavaScript object .

The last line in the example creates a readable file stream of CSV data and pipes it through the CSV parser, and that output is piped again back through stdout so the results can be viewed . This could also be piped through a compression module to directly support compressed CSV files, or anything else you can think of doing with pipe and streams.

This example doesn’t implement all of the things real-world CSV files can contain, but it does show that building streaming parsers with stream.Transform isn’t too complicated, depending on the file format or protocol.

Now that you’ve learned how to use the base classes, you’re probably wondering what the options argument in listing 5.10 was used for. The next section includes some details on how to use options to optimize stream throughput, and details some more advanced techniques.

5.5. Advanced patterns and optimization

The stream base classes accept various options for tailoring their behavior, and some of these options can be used to tune performance. This section has techniques for optimizing streams, using the older streams API, adapting streams based on input, and testing streams.

Technique 35 Optimizing streams

Built-in streams and the classes used to build custom streams allow the internal buffer size to be configured. It’s useful to know how to optimize this value to attain the desired performance characteristics.

Problem

You want to read data from a file, but are concerned about either speed or memory performance.

Solution

Optimize the stream’s buffer size to suit your application’s requirements.

Discussion

The built-in stream functions take a buffer size parameter, which allows the performance characteristics to be tailored to a given application. The fs.createReadStream method takes an options argument that can include a bufferSize property. This option is passed tostream.Readable, so it’ll control the internal buffer used to temporarily store file data before it’s used elsewhere.

The stream created by zlib.createGzip is an instance of streams.Transform, and the Zlib class creates its own internal buffer object for storing data. Controlling the size of this buffer is also possible, but this time the options property is chunkSize. Node’s documentation has a section on optimizing the memory usage of zlib,[1] based on the documentation in the zlib/zconf.h header file, which is part of the low-level source code used to implement zlib itself.

1 See “Memory Usage Tuning”—http://nodejs.org/docs/latest/api/all.html#all_process_memoryusage.

In practice it’s quite difficult to push Node’s streams to exhibit different CPU performance characteristics based on buffer size. But to illustrate the concept, we’ve included a small benchmarking script that includes some interesting ideas about measuring stream performance. The next listing attempts to gather statistics on memory and elapsed time.

Listing 5.11. Benchmarking streams

This is a long example, but it just uses some of Node’s built-in functionality to gather memory statistics over time for streams designed to use different buffer sizes. The benchStream function performs most of the work and is executed several times. It records the current time using hrtime, which returns more precise measurements than Date.now() would. The input stream is the Unix dictionary file, which is piped through a gzip stream and then out to a file . Then benchStream uses setInterval to run a periodic check on the memory usage . When the input stream ends , the memory usage is calculated based on the values before and after the input file was gzipped.

The run function doubles the input file’s buffer and gzip buffer to show the impact on memory and the time taken to read the streams over time. When the reading of the input file completes, the memory usage and elapsed time will be printed . The input file is returned by thebenchStream function so run can easily be called when benchmarking has finished. The run function will be called repeatedly , depending on the first argument passed to it .

Note that process.hrtime has been used to accurately benchmark the elapsed time. This method can be used for benchmarking because it’s precise, and also accepts a time argument for automatically calculating the elapsed time.

I (Alex) ran this program with a 20 MB file to try to generate more interesting results than /usr/share/dict/words, and I’ve included a graph of the results in figure 5.5.

Figure 5.5. A graphical representation of the memory usage of streams

I found when I experimented with various files that the results indicated that elapsed time was far less affected than the memory usage. That indicates that it’s generally desirable to use smaller buffers and be more conservative about memory usage, although this test should be repeated with a load-testing benchmark to really see how long it takes Node to process those buffers.

Node had an older API for streams that had different semantics for pausing a stream. Although the newer API should be used where possible, it’s possible to use the older API alongside the newer one. The next technique demonstrates how to use modules written with the older API.

Technique 36 Using the old streams API

Before Node 0.10 (and technically 0.9.4), streams had a different API. Code written using that API can be used with the newer APIs by wrapping it to behave like the newer stream.Readable class.

Problem

You want to use a module that implements the old-style streaming API with classes that use the newer APIs.

Solution

Use Readable.prototype.wrap.

Discussion

The older stream API had readable and writable streams, but pausing a stream was “advisory” only. This led to a different API design that wasn’t based around the newer streams2 classes. As people gradually realized how useful streamable classes are, a wealth of modules appeared on npm. Although the newer API solves key problems with the older design, there are still useful modules that haven’t been updated.

Fortunately, older classes can be wrapped using the Readable.prototype.wrap method provided by the stream module. It literally wraps the older interface to make it behave like the newer stream.Readable class—it effectively creates a Readable instance that uses the older class as its data source.

Listing 5.12 shows an example of a stream implemented with the older API that has been wrapped with the newer Readable class.

Listing 5.12. An old-style stream that has been wrapped

The example in listing 5.12 presents a simple class that inherits from the Node 0.8 stream module. The readable property is part of the old API, and signifies that this is a readable stream. Another indicator that this is a legacy stream is the data event . The newerReadable.prototype.wrap method is what translates all of this to make it compatible with the streams2 API style. At the end, the wrapped stream is piped to a Node 0.10 stream .

Now you should be able to use older streams with the newer APIs!

Sometimes streams need to change their behavior depending on the type of input that has been provided. The next technique looks at ways of doing just that.

Technique 37 Adapting streams based on their destination

Stream classes are typically designed to solve a specific problem, but there’s also potential for customizing their behavior by detecting how the stream is being used.

Problem

You want to make a stream behave differently when it’s piped to the TTY (the user’s shell).

Solution

Bind a listener to the pipe event, and then use stream.isTTY to check if the stream is bound to a terminal.

Discussion

This technique is a specific example of adapting a stream’s behavior to its environment, but the general approach could be adapted to other problems as well. Sometimes it’s useful to detect whether a stream is writing output to a TTY or something else—perhaps a file—because different behavior in each is desirable. For example, when printing to a TTY, some commands will use ANSI colors, but this isn’t usually advisable when writing files because strange characters would clutter the results.

Node makes detecting whether the current process is connected to a TTY simple—just use process.stdout.isTTY and process.stdin.isTTY. These are Boolean properties that are derived from OS-level bindings in Node’s source (in lib/tty.js).

The strategy to use for adapting a stream’s output is to create a new stream.Writable class and set an internal property based on isTTY. Then add a listener to the pipe event, which changes isTTY based on the newly piped stream that’s passed as the first argument to the listener callback.

Listing 5.13 demonstrates this by using two classes. The first, MemoryStream, inherits from stream.Readable and generates data based on Node’s memory usage. The second, OutputStream, monitors the stream it’s bound to so it can tell the readable stream about what kind of output it expects.

Listing 5.13. Using isTTY to adapt stream behavior

Internally, Node uses isTTY to adapt the behavior of the repl module and the readline interface. The example in listing 5.13 tracks the state of process.stdout.isTTY to determine what the original output stream was, and then copies that value to subsequent destinations . When the terminal is a TTY, colors are used ; otherwise plain text is output instead.

Streams, like anything else, should be tested. The next technique presents a method for writing unit tests for your own stream classes.

Technique 38 Testing streams

Just like anything else you write, it’s strongly recommended that you test your streams. This technique explains how to use Node’s built-in assert module to test a class that inherits from stream.Readable.

Problem

You’ve written your own stream class and you want to write a unit test for it.

Solution

Use some suitable sample data to drive your stream class, and then call read() or write() to gather the results and compare them to the expected output.

Discussion

The common pattern for testing streams, used in Node’s source itself and by many open source developers, is to drive the stream being tested using sample data and then compare the end results against expected values.

The most difficult part of this can be coming up with suitable data to test. Sometimes it’s easy to create a text file, or a fixture in testing nomenclature, that can be used to drive the stream by piping it. If you’re testing a network-oriented stream, then you should consider using Node’s net orhttp modules to create “mock” servers that generate suitable test data.

Listing 5.14 is a modified version of the CSV parser from technique 34; it has been turned into a module so we can easily test it. Listing 5.15 is the associated test that creates an instance of CSVParser and then pushes some values through it.

Listing 5.14. The CSVParser stream

The CSVParser class is exported using module.exports so it can be loaded by the unit test . The _transform method will run later when push is called on an instance of this class. Next up is a simple unit test for this class.

Listing 5.15. Testing the CSVParser stream

A fixture file, sample.csv, has been used to pipe data to the CSVParser instance. Then the assert.deepEqual method has been used to make it easy to compare the expected array with the actual array.

A listener is attached to exit because we want to wait for the streams to finish processing the data before running the assertion. Then data is read from the parser and pushed to an array to examine with assertions —the expected values are defined first . This pattern is used in Node’s own streams tests, and is a lightweight version of what test frameworks like Mocha and node-tap provide.

5.6. Summary

In this chapter you’ve seen how the built-in streamable APIs work, how to create new and novel streams using the base classes provided by Node, and how to use some more advanced techniques to structure programs with streams. As you saw in technique 36, building new streams starts with correctly inheriting from the base classes—and don’t forget to test those streams! For more on testing, refer back to technique 38.

As you saw, there are some novel uses of streams, like substack’s baudio module (https://github.com/substack/baudio) that speaks in streams of sound waves. There are also two streams APIs: the original Node 0.8 and below API, and the newer streams2 API. Forward compatibility is supported through the readable-stream module (https://github.com/isaacs/readable-stream), and backward compatibility is made possible by wrapping streams (technique 36).

A big part of working with streams is handling files. In the next chapter we’ll look at Node’s file system handling in detail.