Communicating Sequential Processes - Seven Concurrency Models in Seven Weeks (2014)

Seven Concurrency Models in Seven Weeks (2014)

Chapter 6. Communicating Sequential Processes

If you’re a car nut like me, it’s easy to focus on the vehicle and forget about the roads it travels on. It’s fascinating to debate the relative merits of turbo-charging versus natural aspiration or a mid- versus a front-engine layout, forgetting that the most important aspect of a car has nothing to do with any of these things. Where you can go and how fast you can get there is primarily defined by the road network, not the car.

Similarly, the features and capabilities of a message-passing system are not primarily defined by the code between which messages are exchanged or their content, but by the transport over which they travel.

In this chapter we’ll look at a model that has surface similarities with actors but a very different feel—thanks to a difference in focus.

Communication Is Everything

As we saw in the last chapter, an actor program consists of independent, concurrently executing entities (called actors, or processes in Elixir) that communicate by sending each other messages. Each actor has a mailbox that stores messages until they’re handled.

A program using the communicating sequential processes model similarly consists of independent, concurrently executing entities that communicate by sending each other messages. The difference is one of emphasis—instead of focusing on the entities sending the messages, CSP focuses on the channels over which they are sent. Channels are first class—instead of each process being tightly coupled to a single mailbox, channels can be independently created, written to, read from, and passed between processes.

Like functional programming and actors, CSP is an old idea that’s experiencing a renaissance. CSP’s recent popularity is largely due to the Go language.[32] We’re going to cover CSP by examining the core.async library,[33] which brings Go’s concurrency model to Clojure.

In day 1 we’ll introduce the twin pillars upon which core.async is built: channels and go blocks. In day 2 we’ll construct a realistic example program with them. Finally, in day 3 we’ll see how core.async can be used within ClojureScript to make client-side programming easier.

Day 1: Channels and Go Blocks

The core.async library provides two primary facilities—channels and go blocks. Go blocks allow multiple concurrent tasks to be efficiently multiplexed across a limited pool of threads. But first, let’s look at channels.

Using core.async

The core.async library is a relatively recent addition to Clojure and is still in prerelease (so be aware that things may change). To use it, you need to make the library a dependency of your project and then import it. This is slightly complicated by the fact that it defines a few functions with names that clash with core Clojure library functions. To make it easier to experiment with, you can use the channels project in the book’s sample code, which imports core.async like this:


(ns channels.core

(:require [clojure.core.async :as async :refer :all

:exclude [map into reduce merge partition partition-by take]]))

The :refer :all allows most core.async functions to be used directly, but a few (those with names that clash with core library functions) have to be given the async/ prefix.

You can start a REPL with these definitions loaded by changing the directory to the channels project and typing lein repl.


A channel is a thread-safe queue—any task with a reference to a channel can add messages to one end, and any task with a reference to it can remove messages from the other. Unlike actors, where messages are sent to and from specific actors, senders don’t have to know about receivers, or vice versa.

A new channel is created with chan:

channels.core=> (def c (chan))


We can write to a channel with >!! and read from it with <!!:

channels.core=> (thread (println "Read:" (<!! c) "from c"))

#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@78fcc563>

channels.core=> (>!! c "Hello thread")

Read: Hello thread from c


We’re using the handy thread utility macro provided by core.async, which, as its name suggests, runs its code on a separate thread. That thread prints a message containing whatever it reads from the channel. This blocks until we write to the channel with >!!, at which point we see the message.


By default, channels are synchronous (or unbuffered)—writing to a channel blocks until something reads from it:

channels.core=> (thread (>!! c "Hello") (println "Write completed"))

#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@78fcc563>

channels.core=> (<!! c)

Write completed


We can create a buffered channel by passing a buffer size to chan:

channels.core=> (def bc (chan 5))


channels.core=> (>!! bc 0)


channels.core=> (>!! bc 1)


channels.core=> (close! bc)


channels.core=> (<!! bc)


channels.core=> (<!! bc)


channels.core=> (<!! bc)


This creates a channel with a buffer large enough to contain five messages. As long as there’s space available, writing to a buffered channel completes immediately.

Closing Channels

The previous example demonstrated another feature of channels—they can be closed with close!. Reading from an empty closed channel returns nil, and writing to a closed channel silently discards the message. As you might expect, writing nil to a channel is an error:

channels.core=> (>!! (chan) nil)

IllegalArgumentException Can't put nil on channel ...

Here’s a function that uses what we’ve seen so far to read from a channel until it’s closed and to return everything read as a vector:


(defn readall!! [ch]

(loop [coll []]

(if-let [x (<!! ch)]

(recur (conj coll x))


This loops with coll initially bound to the empty vector []. Each iteration reads a value from ch and, if the value is not nil, it’s added to coll. If the value is nil (the channel has been closed), coll is returned.

And here’s writeall!!, which takes a channel and a sequence and writes the entirety of the sequence to the channel, closing it when the sequence is exhausted:


(defn writeall!! [ch coll]

(doseq [x coll]

(>!! ch x))

(close! ch))

Let’s see these functions in action:

channels.core=> (def ch (chan 10))


channels.core=> (writeall!! ch (range 0 10))


channels.core=> (readall!! ch)

[0 1 2 3 4 5 6 7 8 9]

You won’t be surprised to hear that core.async provides utilities that perform similar tasks, saving us the trouble of writing our own:

channels.core=> (def ch (chan 10))


channels.core=> (onto-chan ch (range 0 10))

#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@6b16d3cf>

channels.core=> (<!! (async/into [] ch))

[0 1 2 3 4 5 6 7 8 9]

The onto-chan function writes the entire contents of a collection onto a channel, closing it when the collection’s exhausted. And async/into takes an initial collection (the empty vector in the preceding example) and a channel and returns a channel. That channel will have a single collection written to it—the result of conjoining everything read from the channel with the initial collection.

Next we’ll use these utilities to investigate buffered channels in more depth.

Full Buffer Strategies

By default, writing to a full channel blocks, but we can choose an alternative strategy by passing a buffer to chan:

channels.core=> (def dc (chan (dropping-buffer 5)))


channels.core=> (onto-chan dc (range 0 10))

#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@147c0def>

channels.core=> (<!! (async/into [] dc))

[0 1 2 3 4]

Here we create a channel with a dropping buffer large enough to hold five messages, and then we write the numbers 0 to 9 to it. This doesn’t block, even though the channel cannot hold so many messages. When we read its contents, we find that only the first five messages are returned—all subsequent messages have been dropped.

Clojure also provides sliding-buffer:

channels.core=> (def sc (chan (sliding-buffer 5)))


channels.core=> (onto-chan sc (range 0 10))

#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@3071908b>

channels.core=> (<!! (async/into [] sc))

[5 6 7 8 9]

As before, we create a channel large enough to hold five messages, but this time with a sliding buffer. When we read its contents, we find that the five most recent messages are returned—writing to a full channel with a sliding buffer drops the oldest message. We’ll look into channels in much more detail later, but before then let’s look at core.async’s other headline feature—go blocks.

Joe asks:

Joe asks:

What—No Automatically Growing Buffer?

We’ve now seen all three types of buffer provided by core.async as standard—blocking, dropping, and sliding. It would be quite possible to create one that simply grows as it needs to accommodate more messages. So why isn’t this provided as standard?

The reason is the age-old lesson that, whenever you have an “inexhaustible” resource, sooner or later you will exhaust it. This might be because over time your program is asked to work on larger problems, or it might be a bug that results in messages piling up because whatever should be handling them doesn’t do so.

If you avoid thinking about what to do when it happens, eventually this will lead to a damaging, obscure, and difficult-to-diagnose bug sometime in the future. Indeed, flooding a process’s mailbox is one of the few ways to comprehensively crash an Erlang system.[34] Better to think about how you want to handle a full buffer today and nip the problem in the bud.

Go Blocks

Threads have both an overhead and a startup cost, which is why most modern programs avoid creating threads directly and use a thread pool instead (see Thread-Creation Redux). Indeed, the thread macro we used earlier today uses a CachedThreadPool under the hood.

Thread pools aren’t always very convenient to use, though. In particular, they are problematic if the code we want to run might block.

The Problem with Blocking

Thread pools are a great way to handle CPU-intensive tasks—those that tie a thread up for a brief period and then return it to the pool to be reused. But what if we want to do something that involves communication? Blocking a thread ties it up indefinitely, eliminating much of the value of using a thread pool.

There are ways around this, but they typically involve restructuring code to make it event-driven, a style of programming that will be familiar to anyone who’s done UI programming or worked with any of the recent breed of evented servers.

Although this works, it breaks up the natural flow of control and can make code difficult to read and reason about. Worse, it can lead to an excess of global state, with event handlers saving data for use by later handlers. And as we’ve seen, state and concurrency really don’t mix.

Go blocks provide an alternative that gives us the best of both worlds—the efficiency of event-driven code without having to compromise its structure or readability. They achieve this by transparently rewriting sequential code into event-driven code under the hood.

Inversion of Control

In common with other Lisps, Clojure provides a powerful macro system. If you’re used to macros in other languages (C/C++ pre-processor macros, for example), Lisp macros can seem like magic, enabling dramatic code transformations. The go macro is particularly magical.

Code within a go block is transformed into a state machine. Instead of blocking when it reads from or writes to a channel, the state machine parks, relinquishing control of the thread it’s executing on. When it’s next able to run, it performs a state transition and continues execution, potentially on another thread.

This represents an inversion of control, allowing the core.async runtime to efficiently multiplex many go blocks over a limited thread pool. Just how efficiently we’ll soon see, but first let’s see an example.


Here’s a simple example of go in action:

channels.core=> (def ch (chan))


channels.core=> (go

#_=> (let [x (<! ch)

#_=> y (<! ch)]

#_=> (println "Sum:" (+ x y))))

#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@13ac7b98>

channels.core=> (>!! ch 3)


channels.core=> (>!! ch 4)


Sum: 7

We start by creating a channel ch, followed by a go block that reads two values from it, then prints their sum. Although it looks like the go block should block when it reads from the channel, something far more interesting is going on.

Instead of using <!! to read from the channel, our go block is using <!. The single exclamation mark indicates that this is the parking version of a channel read, not the blocking version. As you might expect, >! is the parking version of the blocking >!!.

The go macro converts this sequential code into a state machine with three states:


1. The initial state immediately parks, waiting for something to be available for reading from ch. When it is, the state machine transitions to state 2.

2. Next the state machine binds the value read from ch to x and then parks, waiting for another value to be available, after which it transitions to state 3.

3. Finally, the state machine binds the value read from ch to y, prints a message, and terminates.

Joe asks:

Joe asks:

What Happens If I Block in a Go Block?

If you call a blocking function, such as <!!, in a go block, you will simply block the thread it happens to be running on. Your code will probably execute OK (although if you block enough threads, you might deadlock because no more are available), but doing so defeats the purpose of using go blocks. Nothing will warn you if you make this mistake, however, so it’s up to you to be on your guard.

Happily, if you make the opposite mistake, you will be warned:

channels.core=> (<! ch)

AssertionError Assert failed: <! used not in (go ...) block

nil clojure.core.async/<! (async.clj:83)

Go Blocks Are Cheap

The point of all the go macro’s cleverness is efficiency. Because (unlike threads) go blocks are cheap, we can create many of them without running out of resources. This may seem like a small benefit, but the ability to freely create concurrent tasks without worry is transformative.

You may have noticed that go (and thread, for that matter) returns a channel. This channel will have the result of the go block written to it when it’s complete:

channels.core=> (<!! (go (+ 3 4)))


We can use this fact to create a small function that creates a very large number of go blocks, allowing us to see just how inexpensive go blocks are:


(defn go-add [x y]

(<!! (nth (iterate #(go (inc (<! %))) (go x)) y)))

This contender for the title “world’s most inefficient addition function” adds x to y by creating a pipeline of y go blocks, each one of which increments its argument by one.

To see how this works, let’s build it up in stages:

1. The anonymous function #(go (inc (<! %))) creates a go block that takes a single channel argument, reads a single value from it, and returns a channel containing that value incremented by one.

2. This function is passed to iterate with an initial value of (go x) (a channel that simply has the value x written to it). Recall that iterate returns a lazy sequence of the form (x (f x) (f (f x)) (f (f (f x))) …).

3. We read the y-th element of this sequence with nth, the value of which will be a channel containing the result of incrementing x y times.

4. Finally, we read the value of that channel with <!!.

Let’s see it in action:

channels.core=> (time (go-add 10 10))

"Elapsed time: 1.935 msecs"


channels.core=> (time (go-add 10 1000))

"Elapsed time: 5.311 msecs"


channels.core=> (time (go-add 10 100000))

"Elapsed time: 734.91 msecs"


So that’s 100,000 go blocks created and executed in around three-quarters of a second. That means that a go block compares very favorably to an Elixir process—a very impressive result given that Clojure runs on the JVM, whereas Elixir runs on the Erlang virtual machine, which was built with efficient concurrency in mind.

Now that we’ve seen both channels and go blocks in action, let’s look at how they can be combined to build more complex operations over channels.

Operations over Channels

If you’re thinking that channels have more than a little in common with sequences, you’re not wrong. Like sequences, channels represent ordered sets of values. Like sequences, we should be able to implement higher-level functions that operate over all of a channel’s contents—functions likemap, filter, and so on. And like sequences, we should be able to chain those functions to create composite operations.

Mapping over a Channel

Here’s a channel-oriented version of map:


(defn map-chan [f from]

(let [to (chan)]

(go-loop []

(when-let [x (<! from)]

(>! to (f x))


(close! to))


This takes a function (f) and a source channel (from). It starts by creating a destination channel (to), which is returned at the end of the function. Before then, however, it creates a go block with go-loop, a utility function that’s equivalent to (go (loop …)). The body of the loop uses when-letto read from from and bind the value read to x. If x isn’t null, the body of the when-let is executed, (f x) is written to to, and the loop executed again. If x is null, to is closed.

Here it is in action:

channels.core=> (def ch (chan 10))


channels.core=> (def mapped (map-chan (partial * 2) ch))


channels.core=> (onto-chan ch (range 0 10))

#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@9f3d43e>

channels.core=> (<!! (async/into [] mapped))

[0 2 4 6 8 10 12 14 16 18]

As you might expect, core.async provides its own version of map-chan, called map<. There’s also a channel-oriented version of filter called filter<, mapcat called mapcat<, and so on. As you would expect, these can be combined to create chains of channels:

channels.core=> (def ch (to-chan (range 0 10)))


channels.core=> (<!! (async/into [] (map< (partial * 2) (filter< even? ch))))

[0 4 8 12 16]

The preceding code uses to-chan, another core.async utility function, which creates and returns a channel containing the contents of a sequence, closing it when the sequence is exhausted.

We’re almost at the end of day 1, but before we’re done, let’s have a bit of fun.

A Concurrent Sieve of Eratosthenes

Just because we can, here’s a concurrent version of the sieve of Eratosthenes. The get-primes function returns a channel to which all the prime numbers up to limit will subsequently be written:


(defn factor? [x y]

(zero? (mod y x)))

(defn get-primes [limit]

(let [primes (chan)

numbers (to-chan (range 2 limit))]

(go-loop [ch numbers]

(when-let [prime (<! ch)]

(>! primes prime)

(recur (remove< (partial factor? prime) ch)))

(close! primes))


We’ll go through how this works in a minute (although I encourage you to work through it yourself first—you should have everything you need to do so). But first, let’s prove that it works as advertised. The following main function calls get-primes and then prints out what’s written to the channel it returns:


(defn -main [limit]

(let [primes (get-primes (edn/read-string limit))]

(loop []

(when-let [prime (<!! primes)]

(println prime)


And here’s what we get when we run it:

$ lein run 100000









Let’s see how get-primes works. It starts by creating a channel called primes, which is returned at the end of the function. It then enters a loop, with ch initially bound to numbers, a channel that will have all the numbers from 2 to limit written to it courtesy of to-chan.

The loop reads the first entry from ch, which we know is a prime number (we’ll see why this is true soon), so it’s written to primes. We then loop back around, except this time ch is bound to the result of (remove< (partial factor? prime) ch).

The remove< function is similar to filter<, except that it returns a channel to which only values for which the predicate returns false are written. In our case, it will be a channel with all values removed for which the prime we’ve just identified is a factor.

So, get-primes creates a pipeline of channels; the first contains all numbers from 2 to limit, the second has all numbers that are a multiple of 2 removed, the next has all multiples of 3 removed, and so on, as shown in the following diagram:


Figure 9. A concurrent sieve of Eratosthenes

I don’t want to give you impression that this is an efficient way to implement a parallel prime number sieve—it’s too profligate with channels for that to be true. But it’s a nice demonstration of how channels can be freely combined to create arbitrary communication patterns.

Day 1 Wrap-Up

This brings us to the end of day 1. In day 2 we’ll see how to read from more than one channel and how to construct an IO-intensive program with channels and go blocks.

What We Learned in Day 1

The twin pillars of core.async are channels and go blocks:

· By default, channels are synchronous (unbuffered)—writing to a channel blocks until something reads from it.

· Alternatively, channels can be buffered. Different buffering strategies allow us to decide how to handle a full buffer—we can block, discard the oldest value (sliding buffer), or discard the most recently written value (dropping buffer).

· Go blocks utilize inversion of control to rewrite sequential code as a state machine. Instead of blocking, go blocks are parked, allowing the thread that they’re running on to be used by another go block.

· The blocking versions of channel operations end with two exclamation marks (!!), whereas the parking versions end with a single exclamation mark (!).

Day 1 Self-Study


· The core.async documentation

· Either Timothy Baldridge’s “Core Async Go Macro Internals” screencasts or Huey Petersen’s “The State Machines of core.async” blog post, both of which describe how the go macro implements inversion of control.


· Our implementation of map-chan created and returned a synchronous (unbuffered) channel. What would happen if it used a buffered channel instead? Which is preferable? Under what circumstances (if any) would a buffered channel be an appropriate choice?

· As well as map<, core.async provides map>. How do they differ? Create your own version of map>. When might you use one, and when the other?

· Create a channel-based version of a parallel map (similar to Clojure’s existing pmap or the parallel map function we created in Elixir in the previous chapter).

Day 2: Multiple Channels and IO

Today we’ll see how core.async makes asynchronous IO both simpler and easier to understand. But before then, we’ll look at a feature we’ve not yet seen—handling multiple channels at a time.

Handling Multiple Channels

So far we’ve dealt only with a single channel at a time, but there’s no reason we have to restrict ourselves to doing so. The alt! function allows us to write code that can deal with more than one channel:

channels.core=> (def ch1 (chan))


channels.core=> (def ch2 (chan))


channels.core=> (go-loop []

#_=> (alt!

#_=> ch1 ([x] (println "Read" x "from channel 1"))

#_=> ch2 ([x] (println "Twice" x "is" (* x 2))))

#_=> (recur))

#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@d8fd215>

channels.core=> (>!! ch1 "foo")

Read foo from channel 1


channels.core=> (>!! ch2 21)

Twice 21 is 42


Here we create two channels, ch1 and ch2, and then we create a go block that loops forever, using alt! to read from both. If there’s something available to read from ch1, it’s printed. If there’s something available to read from ch2, it’s doubled and printed.

It should be pretty clear from context what’s going on here—the alt! macro takes pairs of arguments, the first of which is a channel and the second of which is code that’s executed if there’s anything to read from that channel. In our case that code looks similar to an anonymous function—the value read from the channel is bound to x and the subsequent println executed. But it’s not an anonymous function—it doesn’t start with fn.

This is another example of Clojure’s macro system working its magic, allowing alt! to be both more concise and more efficient than it would if it used anonymous functions.

Joe asks:

Joe asks:

What About Writing to Multiple Channels?

We’ve only scratched the surface of the alt! macro—as well as reading from multiple channels, it can also be used to write to multiple channels, or even a mix of reads and writes. We’re not going to use any of this functionality in this book, but it’s worth consulting the documentation if you’re interested in exploring alt! further.


The timeout function returns a channel that closes after a certain number of milliseconds:

channels.core=> (time (<!! (timeout 10000)))

"Elapsed time: 10001.662 msecs"


This can be used in conjunction with alt! to allow other channel operations to time out, as in this example:

channels.core=> (def ch (chan))


channels.core=> (let [t (timeout 10000)]

#_=> (go (alt!

#_=> ch ([x] (println "Read" x "from channel"))

#_=> t (println "Timed out"))))

#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@28134be9>


Timed out

Timeouts are nothing new, of course, but this approach in which timeouts are reified (represented by a concrete entity) is surprisingly powerful, as we’ll see next.

Reified Timeouts

Most systems support timeouts on a per-request basis. Java’s URLConnection class, for example, provides the setReadTimeout method—if the server doesn’t respond within the relevant number of milliseconds, read will throw an IOException.

This is fine if you’re making a single request. But what if you want to limit the total time taken by a series of connections? Per-connection timeouts are little help here, but a reified timeout gives you exactly what you need—simply create a single timeout and use it for each connection in the sequence.

To illustrate this in action, let’s modify the sieve example we created yesterday so that, instead of taking a numeric limit, it simply generates as many prime numbers as it can in a given number of seconds.

We’ll start by modifying get-primes so that it generates primes forever:


(defn get-primes []

(let [primes (chan)


numbers (to-chan (iterate inc 2))]

(go-loop [ch numbers]

(when-let [prime (<! ch)]

(>! primes prime)

(recur (remove< (partial factor? prime) ch)))

(close! primes))


Instead of our initial channel being generated by (range 2 limit), we use the infinite sequence (iterate inc 2).

Here’s how we call it:


(defn -main [seconds]

(let [primes (get-primes)


limit (timeout (* (edn/read-string seconds) 1000))]

(loop []


(alt!! :priority true


limit nil


primes ([prime] (println prime) (recur))))))

We’re using alt!!, which is, as you would expect, the blocking version of alt!. This blocks until either a new prime is available or limit times out, in which case it simply returns nil. The :priority true option ensures that the clauses passed to alt!! are evaluated in order (by default, if two clauses could execute, one is chosen nondeterministically). This avoids the (admittedly unlikely) event of primes being generated so quickly that there’s always one available and the timeout clause never gets evaluated. This is a very natural way to express the problem we’re trying to solve—much more natural than anything we could create with per-request timeouts.

In the next section we’ll use timeouts, together with Clojure’s macro system, to build a convenient utility that addresses a common use case—polling.

Asynchronous Polling

Later today we’re going to build an RSS reader. Among other things, it will need to poll the news-feeds it’s monitoring to detect new articles. In this section we’ll use timeouts, together with Clojure’s macro support, to build a utility that makes efficient, asynchronous polling almost trivially easy.

A Polling Function

The timeout function we saw earlier today is exactly what we need to implement polling. Here’s a function that takes an interval in seconds, together with a function, and calls that function once every interval:


(defn poll-fn [interval action]

(let [seconds (* interval 1000)]

(go (while true


(<! (timeout seconds))))))

It’s simple enough, and it works exactly as you might expect:

polling.core=> (poll-fn 10 #(println "Polling at:" (System/currentTimeMillis)))

#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@6e624159>


Polling at: 1388827086165

Polling at: 1388827096166

Polling at: 1388827106168

But there’s a problem—you might think that because poll-fn calls the function it’s given within a go block, that function should be able to call parking functions. But let’s see what happens if we try:

polling.core=> (def ch (to-chan (iterate inc 0)))


polling.core=> (poll-fn 10 #(println "Read:" (<! ch)))

Exception in thread "async-dispatch-1" java.lang.AssertionError:

Assert failed: <! used not in (go ...) block


The problem is that parking calls need to be made directly within a go block—Clojure’s macro system is unable to perform its magic otherwise.

A Polling Macro

The solution is to write our polling utility as a macro instead of as a function:


(defmacro poll [interval & body]

`(let [seconds# (* ~interval 1000)]

(go (while true

(do ~@body)

(<! (timeout seconds#))))))

We can’t discuss Clojure macros in detail here, so you’ll have to take quite a bit of this on trust. We’ll look at poll’s expansion soon, but in the meantime here are a few pointers that should help you understand how it works:

· Instead of being directly compiled, a macro returns code that is then compiled in turn.

· The backtick (‘) is the syntax quote operator. It takes source code and, instead of executing it, returns a representation of it that can be subsequently compiled.

· Within that code, we can use the ~ (unquote) and ~@ (unquote splice) operators to refer to arguments passed to the macro.

· The # (auto-gensym) suffix indicates that Clojure should automatically generate a unique name (which guarantees that it won’t clash with any names used by code passed to the macro).

Let’s see it in action:

polling.core=> (poll 10

#_=> (println "Polling at:" (System/currentTimeMillis))

#_=> (println (<! ch)))

#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@1bec079e>


Polling at: 1388829368011


Polling at: 1388829378018


Because macros are expanded at compile time, the code passed to poll is inlined and therefore directly contained within poll’s go block, meaning that we can pass code that contains parking calls. But that’s not the only advantage of using a macro—because we’re passing a chunk of code rather than a function, the syntax is much more natural—no need to create an anonymous function. In fact, we’ve created our own control structure.

We can examine the code generated by poll by looking at its macro expansion:

polling.core=> (macroexpand-1

#_=> '(poll 10

#_=> (println "Polling at:" (System/currentTimeMillis))

#_=> (println (<! ch))))

(clojure.core/let [seconds__2691__auto__ (clojure.core/* 10 1000)]


(clojure.core/while true


(println "Polling at:" (System/currentTimeMillis))

(println (<! ch)))

(clojure.core.async/<! (clojure.core.async/timeout seconds__2691__auto__)))))

I’ve reformatted the output of macroexpand-1 slightly to make it easier to read. You can see how the code that was passed to poll has been pasted (spliced) into the code within the macro itself and how seconds# has been turned into a unique name (to see why this is important, imagine that the code that we passed to poll used seconds to mean something else).

We’ll see a practical use of our poll macro in the next section.

Asynchronous IO

IO is one area where asynchronous code comes into its own—instead of the traditional approach of having a thread per connection, asynchronous IO allows us to start a number of operations and receive a notification whenever one of them has data available. Although this is a powerful approach, it can be challenging, with code tending to turn into a mess of callbacks calling callbacks. In this section we’ll see how core.async can make it much easier.

In keeping with the word-counting examples from earlier chapters, we’re going to build an RSS reader that monitors a set of news feeds and, whenever it sees a new article, counts how many words it contains. We’re going to construct this as a pipeline of concurrent go blocks connected by channels:

1. The lowest-level go block monitors a single news feed, polling it once every sixty seconds. After parsing the returned XML, it extracts links to news articles and passes them along the pipeline.

2. The next go block maintains a list of all the articles that have already been retrieved from a particular news feed. Whenever it sees a new article, it passes its URL along the pipeline.

3. The next go block retrieves news articles in turn, counts the words contained within, and passes the resulting counts along the pipeline.

4. The counts from multiple news feeds are merged into a single channel.

5. The highest-level go block monitors this merged channel and prints new counts as they’re received.

This structure is shown in Figure 10, The Structure of the RSS Reader.


Figure 10. The Structure of the RSS Reader

Let’s start by seeing how to integrate an existing asynchronous IO library into core.async.

From Callbacks to Channels

We’re going to use the http-kit library.[35] In common with many asynchronous IO libraries, http-kit indicates that an operation has completed by calling a callback function:

wordcount.core=> (require '[org.httpkit.client :as http])


wordcount.core=> (defn handle-response [response]

#_=> (let [url (get-in response [:opts :url])

#_=> status (:status response)]

#_=> (println "Fetched:" url "with status:" status)))


wordcount.core=> (http/get "" handle-response)

#<core$promise$reify__6310@3a9280d0: :pending>


Fetched: with status: 200

Our first task is to get http-kit to integrate with core.async by wrapping http/get. We’re going to use a function we’ve not seen before—put! doesn’t have to be called within a go block and implements a “fire and forget” write to a channel (and will neither block nor park the task it’s called from):


(defn http-get [url]

(let [ch (chan)]

(http/get url (fn [response]

(if (= 200 (:status response))

(put! ch response)

(do (report-error response) (close! ch)))))


We start by creating a channel, which is returned at the end of the function (a pattern that should be becoming familiar to you by now), and then we call http/get, which returns immediately. At some point in the future, when the GET operation completes, our callback is called. If the status is 200 (success), the callback simply writes the response to the channel, and if the status is anything else, it reports an error and closes the channel.

Next, we’ll create a function that polls an RSS feed.

Polling a Feed

As you would hope, now that we’ve got http-get and poll, polling an RSS feed is simplicity itself:


(def poll-interval 60)

; Simple-minded feed-polling function

; WARNING: Don't use in production (use conditional get instead)

(defn poll-feed [url]

(let [ch (chan)]

(poll poll-interval

(when-let [response (<! (http-get url))]

(let [feed (parse-feed (:body response))]

(onto-chan ch (get-links feed) false))))


The parse-feed and get-links functions use the Rome library to parse the XML returned by the news feed.[36] We won’t look at them here, but you can examine the source code if you’re interested in the details.

The list of links returned by get-links is written to ch with onto-chan. By default, onto-chan closes the channel when the sequence it’s given is exhausted; we disable this behaviour by passing false as the final argument.

Here it is in action:

wordcount.core=> (ns wordcount.feed)


wordcount.feed=> (def feed (poll-feed ""))


wordcount.feed=> (loop []

#_=> (when-let [url (<!! feed)]

#_=> (println url)

#_=> (recur)))

Next we’ll see how to filter the links returned by poll-feed to remove duplicates.

Don't Try This at Home

Although this simple polling strategy is OK for an example in a book, please don’t use it in production. Fetching the entire feed each time you poll places an unnecessarily high load on both your network bandwidth and the server you’re polling, a load that can be reduced by using HTTP’s conditional get.[37]

Unique Links

Our poll-feed function simply returns every link it finds every time it polls the news feed, which results in many duplicates. What we really want is a channel that contains just the new links that have appeared on the feed. This is exactly what the following function gives us:


(defn new-links [url]

(let [in (poll-feed url)

out (chan)]

(go-loop [links #{}]

(let [link (<! in)]

(if (contains? links link)

(recur links)


(>! out link)

(recur (conj links link))))))


We start by creating two channels, in and out. The first is the channel returned by poll-feed; the second is where we’ll write new links. We then start a loop within a go block that maintains links, a set of all the links we’ve seen to date, which is initially bound to the empty set #{}. Whenever we read a link from in, we check to see whether it’s already in links. If it is, we do nothing; otherwise we write the new link to out and add it to links.

Run this from the REPL, and instead of a new tranche of links being generated every sixty seconds, you should only see new links being returned.

Now that we have a feed of links to new articles, the next step is to fetch each of them in turn and count how many words they contain.

Counting Words

With what we’ve seen so far, the get-counts function almost writes itself:


(defn get-counts [urls]

(let [counts (chan)]

(go (while true

(let [url (<! urls)]

(when-let [response (<! (http-get url))]

(let [c (count (get-words (:body response)))]

(>! counts [url c]))))))


It takes a channel urls and, for each URL read from it, fetches the article with http-get, counts the words contained within, and writes a two-element array, where the first item is the article’s URL and the second is the word count to its output channel.

We’re almost done—now we just need to wire everything together.

Putting It All Together

Here’s a main function that implements our complete RSS word counter:


Line 1

(defn -main [feeds-file]


(with-open [rdr (io/reader feeds-file)]


(let [feed-urls (line-seq rdr)


article-urls (doall (map new-links feed-urls))


article-counts (doall (map get-counts article-urls))


counts (async/merge article-counts)]


(while true


(println (<!! counts))))))

This creates a program that takes a file containing a list of news-feed URLs, one on each line. We create a reader for the file on line 2 (Clojure’s with-open function ensures that the file is closed when the reader goes out of scope). And then we convert it into a sequence of URLs with line-seq (line 3). Mapping new-links over this (line 4) turns it into a sequence of channels, each of which will have links to new articles written to it when they’re published. And mapping get-counts over that sequence (line 5) gives us a sequence of channels that will have counts written to them whenever an article is published.

Finally, we use async/merge (line 6) to merge this sequence of channels into a single channel that contains anything written to any of its source channels. The code then loops forever, printing anything that’s written to that merged channel (line 7). Here it is in action:

$ lein run feeds.txt

[ 10671]

[ 11188]

[ 3488]

Keep an eye on your CPU usage while running it. Not only is this code very straightforward and easy to read, but it’s very efficient, capable of monitoring hundreds of feeds concurrently while barely consuming any CPU resources.

Joe asks:

Joe asks:

Why No Buffered Channels?

Take a look at the channels we created today—all of them are unbuffered (synchronous). Newcomers to CSP tend to assume that buffered channels will be used much more frequently than unbuffered, but in fact the opposite is true. Buffered channels do have valid use cases, but think carefully before using one. Make sure that a buffer is necessary.

Day 2 Wrap-Up

That brings us to the end of day 2. In day 3 we’ll see how to use core.async client-side via ClojureScript.

What We Learned in Day 2

Channels and go blocks allow us to create efficient asynchronous code that reads naturally, without the complexity that normally results from using callback functions.

· Existing callback-based APIs can be brought into the asynchronous world by providing a minimal callback function that simply writes to a channel.

· The alt! macro allows a task to read from, or write to, multiple channels.

· The timeout function returns a channel that closes after an interval—allowing timeouts to be treated as first-class entities (reified).

· Parking calls need to be directly contained within a go block. Clojure’s macros can be used to inline code, allowing larger go blocks to be broken up without falling foul of this limitation.

Day 2 Self-Study


· As well as alt!, core.async also provides alts!. How do they differ? When might you use one and when the other?

· In addition to async/merge, core.async provides a number of ways to combine multiple channels. Find the documentation for pub, sub, mult, tap, mix, and admix. When might they be useful?


· Spend some time working through the order in which things take place in the RSS reader. Notice that because we’re using unbuffered channels throughout, the result is very similar to a dataflow program, with earlier go blocks in the pipeline executing as a result of later ones being available to consume data.

What would happen if you used buffered channels instead? Are there any benefits to doing so? What problems are caused by using buffered channels?

· Implement your own version of async/merge. Remember to handle the case where one or more of the source channels are closed. (Hint: You might find this easier to implement with alts! than with alt!).

· Use Clojure’s macro expansion facility to examine the macro expansion of alt!:

channels.core=> (macroexpand-1 '(alt! ch1 ([x] (println x)) ch2 ([y] (println y))))

· You will probably find it easier to understand if you format the code first to get the indentation right and if you remove the clojure.core prefixes. Can you see how alt! achieves the effect of calling an anonymous function without actually doing so?

Day 3: Client-Side CSP

ClojureScript is a version of Clojure that, instead of compiling to Java bytecodes, cross-compiles to JavaScript (see This means that it’s possible to create a web app in which both the server- and client-side code are written in Clojure.

One of the most compelling reasons to do so is that ClojureScript supports core.async, which brings a number of benefits that we’ll explore today, not the least of which is a remedy to the bane of the JavaScript developer’s life—callback hell.

Concurrency Is a State of Mind

If you’ve done any significant client-side JavaScript programming, you’re probably wondering if I’ve gone mad—browser-based JavaScript engines are single threaded, so what relevance can core.async possibly have? Don’t you need multiple threads for concurrent programming to be useful?

The go macro’s inversion of control magic means that ClojureScript can bring the appearance of multiple threads to client-side programming even in the absence of true multithreading. This is a form of cooperative multitasking—one task won’t preemptively interrupt another. As we’ll see, this enables dramatic improvements in code structure and clarity.

Joe asks:

Joe asks:

What About Web Workers?

Recent browsers support a limited form of truly multithreaded JavaScript via web workers.[38] Web workers are intended for background tasks only, however, and don’t have access to the DOM.

Web workers can be used in ClojureScript via, for example, the Servant library.[39]

Hello, ClojureScript

ClojureScript is very similar to Clojure, but there are a few differences—we’ll mention those that will affect us as we run into them.

A typical ClojureScript application has a two-stage compilation process. First, the client-side ClojureScript is compiled to create a JavaScript file, and then the server-side code is compiled and run to create a server that serves pages with that JavaScript included within a <script> tag. Today’s examples all make use of the lein-cljsbuild Leiningen plugin to automate this build process.[40] The server-side code resides in src-clj, and the client-side code in src-cljs.

Let’s look at a simple example project, comprising a single page with a single script. Here’s the page:


Line 1





<title>Hello ClojureScript</title>


<script src="/js/main.js" type="text/javascript"></script>






<div id="content">







The generated JavaScript is included on line 4. That script will populate the empty <div> on line 7. Here’s its source:


Line 1

(ns hello-clojurescript.core


(:require-macros [cljs.core.async.macros :refer [go]])


(:require [goog.dom :as dom]


[cljs.core.async :refer [<! timeout]]))



(defn output [elem message]


(dom/append elem message (dom/createDom "br")))


(defn start []


(let [content (dom/getElement "content")]




(while true


(<! (timeout 1000))


(output content "Hello from task 1")))




(while true


(<! (timeout 1500))


(output content "Hello from task 2")))))



(set! (.-onload js/window) start)

One difference between Clojure and ClojureScript is that any macros used by a script need to be referenced separately with :require-macros (line 2). The output function on line 6 uses the Google Closure library (that’s closure with an s, not a j) to append a message to a DOM element.[41]

This function is used on lines 13 and 17, each of which is within independently running go blocks. The first prints a message once every second, the other once every second and a half.

Finally, on line 19, our start function is set to run by associating it with the JavaScript window object’s onload attribute. This uses ClojureScript’s dot special form, which provides JavaScript interoperability, which takes this:

(set! (.-onload js/window) start)

and translates it into this:

window.onload = hello_clojurescript.core.start;

We won’t look at the code for the server here, since it’s very simple (see the accompanying code if you’re interested in the details).

Compile the script with lein cljsbuild once, run the server with lein run, and point your browser at http://localhost:3000. You should see something like this:

Hello from task 1

Hello from task 2

Hello from task 1

Hello from task 1

Hello from task 2

Who says you need threads to have concurrency?

Independently running concurrent tasks are great as far as they go, but most user interfaces need to interact with the user, which means handling events, the next thing we’ll look at.

Handling Events

We’ll see how event handling works in ClojureScript by creating a simple animation that reacts to mouse clicks. We’re going to create a web page that displays circles that shrink to a point and eventually disappear wherever the user clicks, as shown in Figure 11, Shrinking circles.


Figure 11. Shrinking circles

The code for the page is very simple, comprising a single <div> that fills the entire window:





<script src="/js/main.js" type="text/javascript"></script>



<div id="canvas" width="100%" height="100%"></div>



To draw on this page, we’re going to make use of Google Closure’s graphics support, which abstracts away from the details of drawing within different browsers. The create-graphics function takes a DOM element and returns an object that allows us to use it as a graphics surface:


(defn create-graphics [elem]

(doto (graphics/createGraphics "100%" "100%")

(.render elem)))

And here’s shrinking-circle, which takes such a graphics surface and a position and creates a go block that animates a circle centered on the position:


Line 1

(def stroke (graphics/Stroke. 1 "#ff0000"))



(defn shrinking-circle [graphics x y]




(let [circle (.drawCircle graphics x y 100 stroke nil)]


(loop [r 100]


(<! (timeout 25))


(.setRadius circle r r)


(when (> r 0)


(recur (dec r))))


(.dispose circle))))

We start by creating a circle with Google Closure’s drawCircle function (line 5) and then enter a loop that uses a 25 ms timeout (line 7) to call setRadius forty times a second. Finally, when the radius has decreased to zero, we delete the circle with dispose (line 11).

Now we need a way to tell when the user clicks the mouse on the page. Google Closure provides the listen function, which allows us to register event listeners. Like the http/get function we saw yesterday, this takes a callback function that’s called whenever an event is available. So as we did yesterday, we’re going to translate this into the core.async world by passing a callback that writes events to a channel:


(defn get-events [elem event-type]

(let [ch (chan)]

(events/listen elem event-type

#(put! ch %))


We now have all we need to construct our script:


(defn start []

(let [canvas (dom/getElement "canvas")

graphics (create-graphics canvas)

clicks (get-events canvas "click")]

(go (while true

(let [click (<! clicks)

x (.-offsetX click)

y (.-offsetY click)]

(shrinking-circle graphics x y))))))

(set! (.-onload js/window) start)

We start by looking up the <div> that we’ll be using as our canvas, constructing a graphics object that allows us to draw on it, and getting hold of a channel of mouse-click events. Then we enter a loop that waits for a mouse click, extracts its coordinates with offsetX and offsetY, and creates an animated circle at that position.

This all seems very simple (and it is), but by moving from JavaScript’s callback-oriented world to core.async’s channel-oriented world, we’ve achieved something profound—a solution to callback hell.

Taming Callbacks

Callback hell is a term coined to describe the spaghetti code that results from JavaScript’s callback-heavy approach—callbacks calling callbacks calling callbacks, with various elements of state stashed away so that one callback can communicate with the next.

Moving to an asynchronous programming model provides us with a way out, as we’ll see next.

We’re Off to See the Wizard

A wizard is a common UI pattern in which the user is taken through a sequence of steps to achieve a goal. The last thing we’ll do today is use what we’ve learned to create a callback-less wizard:


Our wizard comprises a form with a number of fieldsets:


<form id="wizard" action="/wizard" method="post">

<fieldset class="step" id="step1">

<legend>Step 1</legend>

<label>First Name:</label><input type="text" name="firstname" />

<label>Last Name:</label><input type="text" name="lastname" />


<fieldset class="step" id="step2">

<legend>Step 2</legend>

<label>Date of Birth:</label><input type="date" name="dob" />

<label>Homepage:</label><input type="url" name="url" />


<fieldset class="step" id="step3">

<legend>Step 3</legend>

<label>Password:</label><input type="password" name="pass1" />

<label>Confirm Password:</label><input type="password" name="pass2" />


<input type="button" id="next" value="Next" />


Each <fieldset> represents a single step. We start with all of them hidden:


label { display:block; width:8em; clear:left; float:left;

text-align:right; margin-right: 3pt; }

input { display:block; }


.step { display:none; }

Our script uses the following utility functions to show and hide the relevant fieldset as necessary:


(defn show [elem]

(set! (.. elem -style -display) "block"))

(defn hide [elem]

(set! (.. elem -style -display) "none"))

(defn set-value [elem value]

(set! (.-value elem) value))

These use a variant of the dot special form that allows attribute accesses to be chained, which takes this:

(set! (.. elem -style -display) "block")

and translates it into this: = "block";

Here’s the code that implements the wizard control flow:


Line 1

(defn start []




(let [wizard (dom/getElement "wizard")


step1 (dom/getElement "step1")


step2 (dom/getElement "step2")


step3 (dom/getElement "step3")


next-button (dom/getElement "next")


next-clicks (get-events next-button "click")]


(show step1)


(<! next-clicks)


(hide step1)


(show step2)


(<! next-clicks)


(set-value next-button "Finish")


(hide step2)


(show step3)


(<! next-clicks)


(.submit wizard))))



(set! (.-onload js/window) start)

We start by getting references to each of the form elements we’ll be dealing with and use the get-events function we wrote earlier to get a channel of “Next” button clicks (line 8). Then it’s a simple case of showing the first step and waiting for the user to click Next (line 10). When the user clicks, we hide step 1, show step 2, and wait for another click on Next. This continues until every step has been completed, at which point we submit the form (line 18).

What stands out about this code is how unremarkable it is—a wizard is a simple linear sequence of steps, and this code reads like a simple linear sequence of steps. Of course, thanks to the magic of the go macro, we know that it’s no such thing—what we’ve actually created is a state machine that runs when it can and parks when it’s waiting for the stimulus that allows it to perform a state transition. But almost all of the time, we can ignore that fact and treat it like the linear code it appears to be.

Day 3 Wrap-Up

This brings us to the end of day 3 and our discussion of core.async’s version of communicating sequential processes.

What We Learned in Day 3

ClojureScript is a Clojure variant that cross-compiles to JavaScript, allowing the power of core.async to be brought to bear on client-side development. Not only does this bring a form of cooperative multitasking to single-threaded JavaScript environments, but it also provides a respite from callback hell.

Day 3 Self-Study


· The ClojureScript implementation of core.async supports parking operations like <! and >!, but not their blocking equivalents <!! or >!!. Why not?

· The documentation for take!—how would you use this to convert a channel-based API into a callback-based API? When might this be useful? (Hint: This may be related to the previous question).


· Use core.async to create a simple browser-based game like Snake, Pong, or Breakout.

· Create a native JavaScript version of the wizard we implemented earlier today. How does it compare to the ClojureScript version?


On the surface, actor and CSP programs are very similar—both are constructed from independent, concurrently executing tasks that communicate by sending each other messages. But as we’ve seen in this chapter, their different emphases result in very different flavors.


The primary strength of CSP compared to actors is flexibility. In an actor program, the medium of communication is tightly coupled to the unit of execution—each actor has precisely one mailbox. In a CSP program, by contrast, channels are first class and can be independently created, written to, read from, and passed between tasks.

Rich Hickey, creator of the Clojure language, explained his reasons for choosing CSP over actors like this:[42]

I remain unenthusiastic about actors. They still couple the producer with the consumer. Yes, one can emulate or implement certain kinds of queues with actors (and, notably, people often do), but since any actor mechanism already incorporates a queue, it seems evident that queues are more primitive.

From a more pragmatic point of view, modern implementations of CSP like core.async that use inversion of control to provide asynchronous tasks bring both efficiency and a dramatically improved programming model to application areas that have traditionally been based on callbacks. We’ve seen two of these—asynchronous IO and UI programming—but there are many others.


If you compare this chapter with the previous one on actors, two topics are conspicuous by their absence—distribution and fault tolerance. Although there’s nothing whatsoever to stop CSP-based languages from supporting both, historically neither has had the same level of focus and support as either has had within actor-based languages—there’s no CSP equivalent of OTP.

As with both threads and locks and actors, CSP programs are susceptible to deadlock and have no direct support for parallelism. Parallel solutions need to be built from concurrent building blocks, raising the specter of nondeterminism.

Other Languages

Like actors, CSP has been around since the 1970s, when it was introduced by Tony Hoare. The two models have largely coevolved, each learning from the other over the years.

In the 1980s, CSP formed the basis for the language occam (upon which this author cut his parallel-programming teeth),[43] but without question the language that’s done most to popularize the model recently is Go.

The inversion of control–based approach to asynchronous tasks provided by both core.async and Go is becoming widely adopted, with support available in, among others, F#,[44] C#,[45] Nemerle,[46] and Scala.[47]

Final Thoughts

Most of the differences between actors and CSP result from the differing focus of the communities that have developed around them. The actor community has concentrated on fault tolerance and distribution, and the CSP community on efficiency and expressiveness. Choosing between them, therefore, is largely a question of deciding which of these aspects is most important to you.

CSP is the last general-purpose programming model we’ll be looking at. In the next chapter we’re going to look at our first special-purpose model.