Actors - Seven Concurrency Models in Seven Weeks (2014)

Seven Concurrency Models in Seven Weeks (2014)

Chapter 5. Actors

An actor is like a rental car—quick and easy to get a hold of when you want one, and if it breaks down you don’t bother trying to fix it; you just call the rental company and another one is delivered to you.

The actor model is a general-purpose concurrent programming model with particularly wide applicability. It targets both shared- and distributed-memory architectures, facilitates geographical distribution, and provides especially strong support for fault tolerance and resilience.

More Object-Oriented than Objects

Functional programming avoids the problems associated with shared mutable state by avoiding mutable state. Actor programming, by contrast, retains mutable state but avoids sharing it.

An actor is like an object in an object-oriented (OO) program—it encapsulates state and communicates with other actors by exchanging messages. The difference is that actors run concurrently with each other and, unlike OO-style message passing (which is really just calling a method), actors really communicate by sending messages to each other.

Although the actor model is a general approach to concurrency that can be used with almost any language, it’s most commonly associated with Erlang.[25] We’re going to cover actors in Elixir,[26] a relatively new language that runs on the Erlang virtual machine (BEAM).

Like Clojure (and Erlang), Elixir is an impure, dynamically typed functional language. If you’re familiar with Java or Ruby, you should find it easy enough to read. This isn’t going to be an Elixir tutorial (this is a book about concurrency, after all, not programming languages), but I’ll introduce the important language features we’re using as we go along. There may be things you just have to take on faith if you’re not already familiar with the language—I recommend Programming Elixir [Tho14] if you want to go deeper.

In day 1 we’ll see the basics of the actor model—creating actors and sending and receiving messages. In day 2 we’ll see how failure detection, coupled with the “let it crash” philosophy, allows actor programs to be fault-tolerant. Finally, in day 3 we’ll see how actors’ support for distributed programming allows us to both scale beyond a single machine and recover from failure of one or more of those machines.

Day 1: Messages and Mailboxes

Today we’ll see how to create and stop processes, send and receive messages, and detect when a process has terminated.

Joe asks:

Joe asks:

Actor or Process?

In Erlang, and therefore Elixir, an actor is called a process. In most environments a process is a heavyweight entity that consumes lots of resources and is expensive to create. An Elixir process, by contrast, is very lightweight—lighter weight even than most systems’ threads, both in terms of resource consumption and startup cost. Elixir programs typically create thousands of processes without problems and don’t normally need to resort to the equivalent of thread pools (see Thread-Creation Redux).

Our First Actor

Let’s dive straight in with an example of creating a simple actor and sending it some messages. We’re going to construct a “talker” actor that knows how to say a few simple phrases in response to messages.

The messages we’ll be sending are tuples—sequences of values. In Elixir, a tuple is written using curly brackets, like this:

{:foo, "this", 42}

This is a 3-tuple (or triple), where the first element is a keyword (Elixir’s keywords are very similar to Clojure’s, even down to the initial colon syntax), the second a string, and the third an integer.

Here’s the code for our actor:

Actors/hello_actors/hello_actors.exs

defmodule Talker do

def loop do

receive do

{:greet, name} -> IO.puts("Hello #{name}")

{:praise, name} -> IO.puts("#{name}, you're amazing")

{:celebrate, name, age} -> IO.puts("Here's to another #{age} years, #{name}")

end

loop

end

end

We’ll pick through this code in more detail soon, but we’re defining an actor that knows how to receive three different kinds of messages and prints an appropriate string when it receives each of them.

Here’s code that creates an instance of our actor and sends it a few messages:

Actors/hello_actors/hello_actors.exs

pid = spawn(&Talker.loop/0)

send(pid, {:greet, "Huey"})

send(pid, {:praise, "Dewey"})

send(pid, {:celebrate, "Louie", 16})

sleep(1000)

First, we spawn an instance of our actor, receiving a process identifier that we bind to the variable pid. A process simply executes a function, in this case the loop function within the Talker module, which takes zero arguments.

Next, we send three messages to our newly created actor and finally sleep for a while to give it time to process those messages (using sleep isn’t the best approach—we’ll see how to do this better soon).

Here’s what you should see when you run it:

Hello Huey

Dewey, you're amazing

Here's to another 16 years, Louie

Now that we’ve seen how to create an actor and send messages to it, let’s see what’s going on under the hood.

Mailboxes Are Queues

One of the most important features of actor programming is that messages are sent asynchronously. Instead of being sent directly to an actor, they are placed in a mailbox:

images/HelloActors

This means that actors are decoupled—actors run at their own speed and don’t block when sending messages.

An actor runs concurrently with other actors but handles messages sequentially, in the order they were added to the mailbox, moving on to the next message only when it’s finished processing the current message. We only have to worry about concurrency when sending messages.

Receiving Messages

An actor typically sits in an infinite loop, waiting for a message to arrive with receive and then processing it. Here’s Talker’s loop again:

Actors/hello_actors/hello_actors.exs

def loop do

receive do

{:greet, name} -> IO.puts("Hello #{name}")

{:praise, name} -> IO.puts("#{name}, you're amazing")

{:celebrate, name, age} -> IO.puts("Here's to another #{age} years, #{name}")

end

loop

end

This function implements an infinite loop by calling itself recursively. The receive block waits for a message and then uses pattern matching to work out how to handle it. Incoming messages are compared against each pattern in turn—if a message matches, the variables in the pattern (nameand age) are bound to the values in the message and the code to the right of the arrow (->) is executed. That code prints a message constructed using string interpolation—the code within each #{…} is evaluated and the resulting value inserted into the string.

Joe asks:

Joe asks:

Won’t Infinite Recursion Blow Up the Stack?

You might be worried that a function like Talker’s loop, which recurses infinitely, would result in the stack growing forever. Happily, there’s no need to worry—in common with many functional languages (Clojure being a prominent exception—see What Is Loop/Recur?), Elixir implements tail-call elimination. Tail-call elimination, as its name suggests, replaces a recursive call with a simple jump if the last thing the function does is call itself.

The code sleeps for a second to allow messages to be processed before exiting. This is an unsatisfactory solution—we can do better.

Linking Processes

We need two things to be able to shut down cleanly. First we need a way to tell our actor to stop when it’s finished processing all the messages in its queue. And second, we need some way to know that it has done so.

We can achieve the first of these by having our actor handle an explicit shutdown message (similar to the poison pill we saw in the code):

Actors/hello_actors/hello_actors2.exs

defmodule Talker do

def loop do

receive do

{:greet, name} -> IO.puts("Hello #{name}")

{:praise, name} -> IO.puts("#{name}, you're amazing")

{:celebrate, name, age} -> IO.puts("Here's to another #{age} years, #{name}")

*

{:shutdown} -> exit(:normal)

end

loop

end

end

And second, we need a way to tell that it has exited, which we can do by setting :trap_exit to true and linking to it by using spawn_link instead of spawn:

Actors/hello_actors/hello_actors2.exs

Process.flag(:trap_exit, true)

pid = spawn_link(&Talker.loop/0)

This means that we’ll be notified (with a system-generated message) when the spawned process terminates. The message that’s sent is a triple of this form:

{:EXIT, pid, reason}

All that remains is to send the shutdown message and listen for the exit message:

Actors/hello_actors/hello_actors2.exs

send(pid, {:greet, "Huey"})

send(pid, {:praise, "Dewey"})

send(pid, {:celebrate, "Louie", 16})

*

send(pid, {:shutdown})

*

receive do

*

{:EXIT, ^pid, reason} -> IO.puts("Talker has exited (#{reason})")

*

end

The ^ (caret) in the receive pattern indicates that instead of binding the second element of the tuple to pid, we want to match a message where the second element has the value that’s already bound to pid.

Here’s what you should see if you run this new version:

Hello Huey

Dewey, you're amazing

Here's to another 16 years, Louie

Talker has exited (normal)

We’ll talk about linking in much more detail tomorrow.

Stateful Actors

Our Talker actor is stateless. It’s tempting to think that you would need mutable variables to create a stateful actor, but in fact all we need is recursion. Here, for example, is an actor that maintains a count that increments each time it receives a message:

Actors/counter/counter.ex

defmodule Counter do

def loop(count) do

receive do

{:next} ->

IO.puts("Current count: #{count}")

loop(count + 1)

end

end

end

Let’s see this in action in Interactive Elixir, iex (the Elixir REPL):

iex(1)> counter = spawn(Counter, :loop, [1])

#PID<0.47.0>

iex(2)> send(counter, {:next})

Current count: 1

{:next}

iex(3)> send(counter, {:next})

{:next}

Current count: 2

iex(4)> send(counter, {:next})

{:next}

Current count: 3

We start by using the three-argument form of spawn, which takes a module name, the name of a function within that module, and a list of arguments, so that we can pass an initial count to Counter.loop. Then, as we expect, it prints a different number each time we send it a {:next} message—a stateful actor with not a mutable variable in sight. And furthermore, this is an actor that can safely access that state without any concurrency bugs, because messages are handled sequentially.

Hiding Messages Behind an API

Our Counter actor works, but it’s not very convenient to use. We need to remember which arguments to pass to spawn and exactly which message(s) it understands (is it {:next}, :next, or {:increment}?). With that in mind, instead of calling spawn and sending messages directly to an actor, it’s common practice to provide a set of API functions:

Actors/counter/counter.ex

defmodule Counter do

*

def start(count) do

*

spawn(__MODULE__, :loop, [count])

*

end

*

def next(counter) do

*

send(counter, {:next})

*

end

def loop(count) do

receive do

{:next} ->

IO.puts("Current count: #{count}")

loop(count + 1)

end

end

end

The implementation of start makes use of the pseudo-variable __MODULE__, which evaluates to the name of the current module. These make using our actor much neater and less error prone:

iex(1)> counter = Counter.start(42)

#PID<0.44.0>

iex(2)> Counter.next(counter)

Current count: 42

{:next}

iex(3)> Counter.next(counter)

{:next}

Current count: 43

An actor that simply prints its state isn’t very useful. Next we’ll see how to implement bidirectional communication so that one actor can query another.

Bidirectional Communication

As we’ve already seen, messages between actors are sent asynchronously—the sender doesn’t block. But what happens if we want to receive a reply? What if, for example, we want our Counter actor to return the next number rather than just printing it?

The actor model doesn’t provide direct support for replies, but it’s something we can build for ourselves very easily by including the identifier of the sending process in the message, which allows the recipient to send a reply:

Actors/counter/counter2.ex

defmodule Counter do

def start(count) do

spawn(__MODULE__, :loop, [count])

end

def next(counter) do

*

ref = make_ref()

*

send(counter, {:next, self(), ref})

*

receive do

*

{:ok, ^ref, count} -> count

*

end

end

def loop(count) do

receive do

{:next, sender, ref} ->

*

send(sender, {:ok, ref, count})

loop(count + 1)

end

end

end

Instead of printing the count, this version sends it back to the sender of the original message as a triple of the following form:

{:ok, ref, count}

Here ref is a unique reference generated by the sender with make_ref.

Joe asks:

Joe asks:

Why Reply with a Tuple?

Our new version of Counter could have simply replied with the count instead of a tuple:

{:next, sender} ->

send(sender, count)

Although this would certainly work, idiomatic Elixir typically uses tuples as messages, where the first element indicates success or failure. In this instance, we also include a unique reference generated by the client, which ensures that the reply will be correctly identified in the event that there are multiple messages waiting in the client’s mailbox.

Let’s prove that it works:

iex(1)> counter = Counter.start(42)

#PID<0.47.0>

iex(2)> Counter.next(counter)

42

iex(3)> Counter.next(counter)

43

We’ll make one further improvement to Counter before we move on—giving it a name to make it discoverable.

Naming Processes

A message is sent to a process, which means that you need to know its identifier. If it’s a process that you created, this is easy, but how do you send a message to a process that you didn’t create?

There are various ways to address this, but one of the most convenient is to register a name for the process:

iex(1)> pid = Counter.start(42)

#PID<0.47.0>

iex(2)> Process.register(pid, :counter)

true

iex(3)> counter = Process.whereis(:counter)

#PID<0.47.0>

iex(4)> Counter.next(counter)

42

We associate a process identifier with a name with Process.register and retrieve it with Process.whereis. We can see all registered processes with Process.registered:

iex(5)> Process.registered

[:kernel_sup, :init, :code_server, :user, :standard_error_sup,

:global_name_server, :application_controller, :file_server_2, :user_drv,

:kernel_safe_sup, :standard_error, :global_group, :error_logger,

:elixir_counter, :counter, :elixir_code_server, :erl_prim_loader, :elixir_sup,

:rex, :inet_db]

As you can see, the virtual machine automatically registers a number of standard processes at startup. Finally, as a convenience, send can take a process name instead of a process identifier directly:

iex(6)> send(:counter, {:next, self(), make_ref()})

{:next, #PID<0.45.0>, #Reference<0.0.0.107>}

iex(7)> receive do msg -> msg end

{:ok, #Reference<0.0.0.107>, 43}

We can use this to modify Counter’s API so that it doesn’t require a process identifier each time we call it:

Actors/counter/counter3.ex

def start(count) do

pid = spawn(__MODULE__, :loop, [count])

*

Process.register(pid, :counter)

pid

end

def next do

ref = make_ref()

*

send(:counter, {:next, self(), ref})

receive do

{:ok, ^ref, count} -> count

end

end

Here it is in use:

iex(1)> Counter.start(42)

#PID<0.47.0>

iex(2)> Counter.next

42

iex(3)> Counter.next

43

The last thing we’ll do today is use what we’ve seen to create a parallel map function similar to Clojure’s pmap. But first a brief interlude.

Interlude—First-Class Functions

Like all functional languages, functions in Elixir are first class—we can bind them to variables, pass them as arguments, and generally treat them as data. Here, for example, is an iex session that shows how we can pass an anonymous function to Enum.map to double every element in an array:

iex(1)> Enum.map([1, 2, 3, 4], fn(x) -> x * 2 end)

[2, 4, 6, 8]

Elixir also provides a shorthand &(…) syntax for defining anonymous functions that’s similar to Clojure’s #(…) reader macro:

iex(2)> Enum.map([1, 2, 3, 4], &(&1 * 2))

[2, 4, 6, 8]

iex(3)> Enum.reduce([1, 2, 3, 4], 0, &(&1 + &2))

10

Given a variable that’s been bound to a function, we can call that function with the . (apply) operator:

iex(4)> double = &(&1 * 2)

#Function<erl_eval.6.80484245>

iex(5)> double.(3)

6

And finally, we can create functions that return functions:

iex(6)> twice = fn(fun) -> fn(x) -> fun.(fun.(x)) end end

#Function<erl_eval.6.80484245>

iex(7)> twice.(double).(3)

12

We now have all the tools we need to construct our parallel map.

Parallel Map

As we saw earlier, Elixir provides a map function that can be used to map a function over a collection, but it does so sequentially. Here’s an alternative that maps each element of the collection in parallel:

Actors/parallel/parallel.ex

defmodule Parallel do

def map(collection, fun) do

parent = self()

processes = Enum.map(collection, fn(e) ->

spawn_link(fn() ->

send(parent, {self(), fun.(e)})

end)

end)

Enum.map(processes, fn(pid) ->

receive do

{^pid, result} -> result

end

end)

end

end

This executes in two phases. In the first, it creates one process for each element of the collection (if the collection has 1,000 elements, it will create 1,000 processes). Each of these applies fun to the relevant element and sends the result back to the parent process. In the second phase, the parent waits for each result.

Let’s prove that it works:

iex(1)> slow_double = fn(x) -> :timer.sleep(1000); x * 2 end

#Function<6.80484245 in :erl_eval.expr/5>

iex(2)> :timer.tc(fn() -> Enum.map([1, 2, 3, 4], slow_double) end)

{4003414, [2, 4, 6, 8]}

iex(3)> :timer.tc(fn() -> Parallel.map([1, 2, 3, 4], slow_double) end)

{1001131, [2, 4, 6, 8]}

This uses :timer.tc, which times the execution of a function and returns a pair containing the time taken together with the return value. You can see that the sequential version takes a little over four seconds, and the parallel version one second.

Day 1 Wrap-Up

This brings us to the end of day 1. In day 2 we’ll see how the actor model helps with error handling and resilience.

What We Learned in Day 1

Actors (processes) run concurrently, do not share state, and communicate by asynchronously sending messages to mailboxes. We covered how to do the following:

· Create a new process with spawn

· Send a message to a process with send

· Use pattern matching to handle messages

· Create a link between two processes and receive notification when one terminates

· Implement bidirectional, synchronous messaging on top of the standard asynchronous messaging

· Register a name for a process

Day 1 Self-Study

Find

· The Elixir library documentation

· The video of Erik Meijer and Clemens Szyperski talking to Carl Hewitt about the actor model at Lang.NEXT 2012

Do

· Measure the cost of creating a process on the Erlang virtual machine. How does it compare with the cost of creating a thread on the Java virtual machine?

· Measure the cost of the parallel map function we created compared to a sequential map. When would it make sense to use a parallel map, and when a sequential map?

· Write a parallel reduce function along the lines of the parallel map function we just created.

Day 2: Error Handling and Resilience

As we saw in Resilient Software for an Unpredictable World, one of the key benefits of concurrency is that it enables us to write fault-tolerant code. Today we’ll look at the tools that actors provide that enable us to do so.

First, though, let’s use the lessons from yesterday to create a slightly more complicated and realistic example, which we’ll use as the basis for today’s discussion.

A Caching Actor

We’re going to create a simple cache for webpages. A client can add a page to the cache by providing a URL together with the text of the page, query the cache for the page associated with a URL, and query the cache to see how many bytes it contains.

We’re going to use a dictionary to store the mapping from URL to page. Like a Clojure map, an Elixir dictionary is a persistent, associative data structure:

iex(1)> d = HashDict.new

#HashDict<[]>

iex(2)> d1 = Dict.put(d, :a, "A value for a")

#HashDict<[a: "A value for a"]>

iex(3)> d2 = Dict.put(d1, :b, "A value for b")

#HashDict<[a: "A value for a", b: "A value for b"]>

iex(4)> d2[:a]

"A value for a"

We create a new dictionary with HashDict.new, add entries to it with Dict.put(dict, key, value), and look up entries with dict[key].

Here’s an implementation of our cache that makes use of the preceding:

Actors/cache/cache.ex

Line 1

defmodule Cache do

-

def loop(pages, size) do

-

receive do

-

{:put, url, page} ->

5

new_pages = Dict.put(pages, url, page)

-

new_size = size + byte_size(page)

-

loop(new_pages, new_size)

-

{:get, sender, ref, url} ->

-

send(sender, {:ok, ref, pages[url]})

10

loop(pages, size)

-

{:size, sender, ref} ->

-

send(sender, {:ok, ref, size})

-

loop(pages, size)

-

{:terminate} -> # Terminate request - don't recurse

15

end

-

end

-

end

It maintains two items of state, pages and size. The first is a dictionary that maps URLs to pages; the second is an integer count of the number of bytes currently stored in the cache (updated with the byte_size function on line 6).

As before, rather than expecting clients to remember the details of how to start and send messages to this actor, we provide an API they can use. First up is start_link:

Actors/cache/cache.ex

def start_link do

pid = spawn_link(__MODULE__, :loop, [HashDict.new, 0])

Process.register(pid, :cache)

pid

end

This passes an empty dictionary and zero size to loop as its initial state, and it registers the resulting process with the name :cache. Finally we have put, get, size, and terminate functions:

Actors/cache/cache.ex

def put(url, page) do

send(:cache, {:put, url, page})

end

def get(url) do

ref = make_ref()

send(:cache, {:get, self(), ref, url})

receive do

{:ok, ^ref, page} -> page

end

end

def size do

ref = make_ref()

send(:cache, {:size, self(), ref})

receive do

{:ok, ^ref, s} -> s

end

end

def terminate do

send(:cache, {:terminate})

end

The put and terminate functions simply take their arguments, package them up as a tuple, and send them as a message. The get and size methods are slightly more complicated, since they both have to wait for a reply. In this case, they are sending a unique reference using the pattern we saw yesterday.

Here’s our actor in use:

iex(1)> Cache.start_link

#PID<0.47.0>

iex(2)> Cache.put("google.com", "Welcome to Google ...")

{:put, "google.com", "Welcome to Google ..."}

iex(3)> Cache.get("google.com")

"Welcome to Google ..."

iex(4)> Cache.size()

21

So far, so good—we can put an entry into our cache, get it back again, and see how large the cache is.

What happens if we call our actor with invalid parameters by trying to add a nil page, for example?

iex(5)> Cache.put("paulbutcher.com", nil)

{:put, "paulbutcher.com", nil}

iex(6)>

=ERROR REPORT==== 22-Aug-2013::16:18:41 ===

Error in process <0.47.0> with exit value: {badarg,[{erlang,byte_size,[nil],[]} …

** (EXIT from #PID<0.47.0>) {:badarg, [{:erlang, :byte_size, [nil], []}, …

Unsurprisingly, given that we didn’t write any code to check the arguments, this fails. In most languages, the only way to address this would be to add code that anticipates what kinds of bad arguments might be sent and to report an error when they are. Elixir gives us another option—separating error handling out into a separate supervisor process. This apparently simple step is transformative, allowing profound improvements in code clarity, maintainability, and reliability.

To see how to write such a supervisor, we need to understand links between processes in more detail.

Fault Detection

In Linking Processes, we used spawn_link to create a link between two processes so that we could detect when one of them terminated. Links are one of the most important concepts in Elixir programming—let’s investigate them in more depth.

Links Propagate Abnormal Termination

We can establish a link between two processes at any time with Process.link. Here’s a small actor that we can use to investigate how links behave:

Actors/links/links.ex

defmodule LinkTest do

def loop do

receive do

{:exit_because, reason} -> exit(reason)

{:link_to, pid} -> Process.link(pid)

{:EXIT, pid, reason} -> IO.puts("#{inspect(pid)} exited because #{reason}")

end

loop

end

end

Let’s create a couple of instances of this actor, link them, and see what happens when one of them fails:

iex(1)> pid1 = spawn(&LinkTest.loop/0)

#PID<0.47.0>

iex(2)> pid2 = spawn(&LinkTest.loop/0)

#PID<0.49.0>

iex(3)> send(pid1, {:link_to, pid2})

{:link_to, #PID<0.49.0>}

iex(4)> send(pid2, {:exit_because, :bad_thing_happened})

{:exit_because, :bad_thing_happened}

We start by creating two instances of our actor and bind their process identifiers to pid1 and pid2. Then we create a link from pid1 to pid2. Finally, we tell pid2 to exit abnormally.

Immediately, we notice that there’s no message printed by pid1 describing why pid2 exited. This is because we haven’t set :trap_exit. Linking the processes has still had an effect, however, as we can see if we use Process.info to query the status of our two processes:

iex(5)> Process.info(pid2, :status)

nil

iex(6)> Process.info(pid1, :status)

nil

So both our processes have terminated, not just pid2. We’ll see how to fix this soon, but first let’s do another experiment.

Links Are Bidirectional

If we try the same experiment again but this time ask pid1 to exit, we see the same behavior—both our processes terminate:

iex(1)> pid1 = spawn(&LinkTest.loop/0)

#PID<0.47.0>

iex(2)> pid2 = spawn(&LinkTest.loop/0)

#PID<0.49.0>

iex(3)> send(pid1, {:link_to, pid2})

{:link_to, #PID<0.49.0>}

iex(4)> send(pid1, {:exit_because, :another_bad_thing_happened})

{:exit_because, :another_bad_thing_happened}

iex(5)> Process.info(pid1, :status)

nil

iex(6)> Process.info(pid2, :status)

nil

This is because links are bidirectional. Creating a link from pid1 to pid2 also creates a link in the other direction—if one of them fails, both of them do.

Normal Termination

Finally, let’s see what happens when one of our linked processes terminates normally (indicated by the special reason :normal):

iex(1)> pid1 = spawn(&LinkTest.loop/0)

#PID<0.47.0>

iex(2)> pid2 = spawn(&LinkTest.loop/0)

#PID<0.49.0>

iex(3)> send(pid1, {:link_to, pid2})

{:link_to, #PID<0.49.0>}

iex(4)> send(pid2, {:exit_because, :normal})

{:exit_because, :normal}

iex(5)> Process.info(pid2, :status)

nil

iex(6)> Process.info(pid1, :status)

{:status, :waiting}

So normal termination does not result in linked processes terminating.

System Processes

We can allow a process to trap another’s exit by setting its :trap_exit flag. This is known in the jargon as making it into a system process:

Actors/links/links.ex

def loop_system do

Process.flag(:trap_exit, true)

loop

end

Here it is in action:

iex(1)> pid1 = spawn(&LinkTest.loop_system/0)

#PID<0.47.0>

iex(2)> pid2 = spawn(&LinkTest.loop/0)

#PID<0.49.0>

iex(3)> send(pid1, {:link_to, pid2})

{:link_to, #PID<0.49.0>}

iex(4)> send(pid2, {:exit_because, :yet_another_bad_thing_happened})

{:exit_because, :yet_another_bad_thing_happened}

#PID<0.49.0> exited because yet_another_bad_thing_happened

iex(5)> Process.info(pid2, :status)

nil

iex(6)> Process.info(pid1, :status)

{:status, :waiting}

This time, we use loop_system to start pid1. Not only does this mean that it’s notified when pid2 has exited (and prints a message to that effect), but it also continues to execute.

Supervising a Process

We now have enough tools at our fingertips to implement a supervisor, a system process that monitors one or more worker processes and takes appropriate action if they fail.

Here’s a supervisor for the cache actor we created earlier that simply restarts its supervisee, the cache actor, if (when) it fails:

Actors/cache/cache.ex

defmodule CacheSupervisor do

def start do

spawn(__MODULE__, :loop_system, [])

end

def loop do

pid = Cache.start_link

receive do

{:EXIT, ^pid, :normal} ->

IO.puts("Cache exited normally")

:ok

{:EXIT, ^pid, reason} ->

IO.puts("Cache failed with reason #{inspect reason} - restarting it")

loop

end

end

def loop_system do

Process.flag(:trap_exit, true)

loop

end

end

This actor starts by marking itself as a system process and then enters loop, which spawns Cache.loop and then blocks until that process exits. If it exits normally, then so does the supervisor (by returning :ok), but if it exits for any other reason, loop recurses and respawns the cache.

Instead of starting an instance of Cache ourselves, we now start CacheSupervisor instead, which creates an instance of Cache on our behalf:

iex(1)> CacheSupervisor.start

#PID<0.47.0>

iex(2)> Cache.put("google.com", "Welcome to Google ...")

{:put, "google.com", "Welcome to Google ..."}

iex(3)> Cache.size

21

If Cache crashes, it’s automatically restarted:

iex(4)> Cache.put("paulbutcher.com", nil)

{:put, "paulbutcher.com", nil}

Cache failed with reason {:badarg, [{:erlang, :byte_size, [nil], []}, …

iex(5)>

=ERROR REPORT==== 22-Aug-2013::17:49:24 ===

Error in process <0.48.0> with exit value: {badarg,[{erlang,byte_size,[nil],[]}, …

iex(5)> Cache.size

0

iex(6)> Cache.put("google.com", "Welcome to Google ...")

{:put, "google.com", "Welcome to Google ..."}

iex(7)> Cache.get("google.com")

"Welcome to Google ..."

We lose whatever was in the cache when it crashed, of course, but at least there’s still a cache for us to use subsequently.

Timeouts

Automatically restarting the cache is great, but it’s not a panacea. If two processes both send messages to the cache at around the same time, for example, we might see the following sequence:

1. Process 1 sends a :put message to the cache.

2. Process 2 sends a :get message to the cache.

3. The cache crashes while processing process 1’s message.

4. The supervisor restarts the cache, but process 2’s message is lost.

5. Process 2 is now deadlocked in a receive, waiting for a reply that will never arrive.

We can handle this by ensuring that our receive times out after a while by adding an after clause. Here’s a modified version of get (we’ll need to make the same change to size as well):

Actors/cache/cache2.ex

def get(url) do

ref = make_ref()

send(:cache, {:get, self(), ref, url})

receive do

{:ok, ^ref, page} -> page

*

after 1000 -> nil

end

end

Joe asks:

Joe asks:

Is Message Delivery Guaranteed?

The problem we just looked at, of a client’s message being lost when our cache is restarted, is just one example of a more general problem—what guarantees about message delivery does Elixir provide?

There are two basic guarantees:

· Message delivery is guaranteed if nothing breaks.

· If something does break, you’ll know about it (assuming you’ve linked to, or monitored, the process in question).

It’s this second guarantee that forms the bedrock of Elixir’s support for writing fault-tolerant code.

The Error-Kernel Pattern

Tony Hoare famously said the following:[27]

There are two ways of constructing a software design: One way is to make it so simple that there are obviously no deficiencies and the other way is to make it so complicated that there are no obvious deficiencies.

Actor programming naturally supports an approach to writing fault-tolerant code that leverages this observation: the error-kernel pattern.

A software system’s error kernel is the part that must be correct if the system is to function correctly. Well-written programs make this error kernel as small and as simple as possible—so small and simple that there are obviously no deficiencies.

An actor program’s error kernel is its top-level supervisors. These supervise their children—starting, stopping, and restarting them as necessary.

Each module of a program has its own error kernel in turn—the part of the module that must be correct for it to function correctly. Submodules also have error kernels, and so on. This leads to a hierarchy of error kernels in which risky operations are pushed down toward the lower-level actors, as shown in Figure 8, A hierarchy of error kernels.

images/ErrorKernel2


Figure 8. A hierarchy of error kernels

Closely related to the error-kernel pattern is the thorny subject of defensive programming.

Let It Crash!

Defensive programming is an approach to achieving fault tolerance by trying to anticipate possible bugs. Imagine, for example, that we’re writing a method that takes a string and returns true if it’s all uppercase and false otherwise. Here’s one possible implementation:

def all_upper?(s) do

String.upcase(s) == s

end

This is a perfectly reasonable method, but if for some reason we pass nil to it, it will crash. With that in mind, some developers would change it to read like this:

defmodule Upper do

def all_upper?(s) do

cond do

nil?(s) -> false

true -> String.upcase(s) == s

end

end

end

So now the code won’t crash if it’s given nil, but what if we pass something else that doesn’t make sense (a keyword, for example)? And in any case, what does it mean to call this function with nil? There’s an excellent chance that any code that does so contains a bug—a bug that we’ve now masked, meaning that we’re likely to remain unaware of it until it bites us at some time in the future.

Actor programs tend to avoid defensive programming and subscribe to the “let it crash” philosophy, allowing an actor’s supervisor to address the problem instead. This has multiple benefits, including these:

· Our code is simpler and easier to understand, with a clear separation between “happy path” and fault-tolerant code.

· Actors are separate from one another and don’t share state, so there’s little danger that a failure in one actor will adversely affect another. In particular, a failed actor’s supervisor cannot crash because the actor it’s supervising crashes.

· As well as fixing the error, a supervisor can log it so that instead of sweeping problems under the carpet, we become aware of them and can take remedial action.

Although it can seem alien at first acquaintance, the “let it crash” philosophy has, together with the error-kernel pattern, repeatedly been proven in production. Some systems have reported availability as high as 99.9999999% (that’s nine nines—see Programming Erlang: Software for a Concurrent World [Arm13]).

Day 2 Wrap-Up

Day 1 introduced the basics of the actor model, and in day 2, we saw how it facilitates fault tolerance. In day 3 we’ll see how the actor model helps with distributed programming.

What We Learned in Day 2

Elixir provides fault detection by allowing processes to be linked, which can be used to create supervisors:

· Links are bidirectional—if process a is linked to process b, then b is also linked to a.

· Links propagate errors—if two processes are linked and one of them terminates abnormally, so will the other.

· If a process is marked as a system process, instead of exiting when a linked process terminates abnormally, it’s notified with an :EXIT message.

Day 2 Self-Study

Find

· The documentation for Process.monitor—how does monitoring a process differ from linking? When might you use monitors and when links?

· How do exceptions work in Elixir? When might you choose to use exception handling instead of supervision and the “let it crash” pattern?

Do

· Messages that don’t match a pattern in a receive block remain in a process’s mailbox. Use this fact, together with timeouts, to implement a priority mailbox, in which high-priority messages are handled ahead of any low-priority messages that might have been sent earlier.

· Create a version of the cache we created in A Caching Actor, that distributes cache entries across multiple actors according to a hash function. Create a supervisor that starts multiple cache actors and routes incoming messages to the appropriate cache worker. What action should this supervisor take if one of the cache workers fails?

Day 3: Distribution

Everything we’ve done so far has been on a single computer, but one of the actor model’s primary benefits compared to the models we’ve seen so far is that it supports distribution—sending a message to an actor on another machine is just as easy as sending it to one running locally.

Before talking about distribution, however, we’ll take a quick look at one of the most powerful reasons for using Elixir—the OTP library.

OTP

Over the previous two days, we built everything by hand in “raw” Elixir. This is a great way to understand what’s going on under the hood, but it would be both tedious and error prone if we had to write every worker and every supervisor from scratch every time. You won’t be surprised to hear that a library can automate much of this for us—that library is called OTP.

Joe asks:

Joe asks:

What Does OTP Stand For?

Acronyms often take on a life of their own. IBM might theoretically stand for “International Business Machines,” but to most people IBM is just IBM: the acronym has become the name. Similarly BBC no longer really stands for “British Broadcasting Corporation,” and OTP no longer really stands for “Open Telecom Platform.”

Erlang (and therefore Elixir) originally started out in telecommunications, and many proven Erlang best practices have been codified in OTP. But very little of it is telecom-specific, so OTP is just OTP.

Before we see an example of OTP, we’ll take a brief interlude to examine how functions and pattern matching interact in Elixir.

Functions and Pattern Matching

So far we’ve only talked about pattern matching within receive, but it’s used throughout Elixir. In particular, every time you call a function, you’re performing a pattern match. Here’s a simple function that demonstrates this:

Actors/patterns/patterns.ex

defmodule Patterns do

def foo({x, y}) do

IO.puts("Got a pair, first element #{x}, second #{y}")

end

end

We’re defining a function that takes a single argument and matches that argument against the pattern {x, y}. If we call it with a matching pair, the first element is bound to x and the second to y:

iex(1)> Patterns.foo({:a, 42})

Got a pair, first element a, second 42

:ok

If we call it with an argument that doesn’t match, we get an error:

iex(2)> Patterns.foo("something else")

** (FunctionClauseError) no function clause matching in Patterns.foo/1

patterns.ex:3: Patterns.foo("something else")

erl_eval.erl:569: :erl_eval.do_apply/6

src/elixir.erl:147: :elixir.eval_forms/3

We can add as many different definitions (or clauses) for a function as we need:

Actors/patterns/patterns.ex

def foo({x, y, z}) do

IO.puts("Got a triple: #{x}, #{y}, #{z}")

end

When the function is called, the matching clause is executed:

iex(2)> Patterns.foo({:a, 42, "yahoo"})

Got a triple: a, 42, yahoo

:ok

iex(3)> Patterns.foo({:x, :y})

Got a pair, first element x, second y

:ok

Now let’s see how this is used when implementing a server in OTP.

Reimplementing Cache with GenServer

The first aspect of OTP we’ll look at is GenServer, a behaviour that allows us to automate the details of creating a stateful actor. We’ll use it to reimplement the cache we created yesterday.

If the spelling of behaviour looks slightly odd to you, that’s because behaviours are inherited from Erlang, and Erlang uses the British spelling. Because that’s how Elixir spells it, that’s how we’ll spell it here too.

A behaviour is very similar to an interface in Java—it defines a set of functions. A module specifies that it implements a behaviour with use:

Actors/cache/cache3.ex

defmodule Cache do

*

use GenServer.Behaviour

def handle_cast({:put, url, page}, {pages, size}) do

new_pages = Dict.put(pages, url, page)

new_size = size + byte_size(page)

{:noreply, {new_pages, new_size}}

end

def handle_call({:get, url}, _from, {pages, size}) do

{:reply, pages[url], {pages, size}}

end

def handle_call({:size}, _from, {pages, size}) do

{:reply, size, {pages, size}}

end

end

This version of Cache specifies that it implements GenServer.Behaviour and provides custom implementations of two functions, handle_cast and handle_call.

The first of these, handle_cast, handles messages that do not require a reply. It takes two arguments: the first is the message and the second is the current actor state. The return value is a pair of the form {:noreply, new_state}. In our case, we provide one handle_cast clause that handles :putmessages.

The second, handle_call, handles messages that require a reply. It takes three arguments, the message, the sender, and the current state. The return value is a triple of the form {:reply, reply_value, new_state}. In our case, we provide two handle_call clauses, one that handles :get messages and one that handles :size messages. Note that like Clojure, Elixir uses variable names that start with an underscore (“_”) to indicate that they’re unused—hence _from.

As with our previous implementation, we provide an API that clients can use without having to remember the details of how to initialize and send messages:

Actors/cache/cache3.ex

def start_link do

:gen_server.start_link({:local, :cache}, __MODULE__, {HashDict.new, 0}, [])

end

def put(url, page) do

:gen_server.cast(:cache, {:put, url, page})

end

def get(url) do

:gen_server.call(:cache, {:get, url})

end

def size do

:gen_server.call(:cache, {:size})

end

Instead of using spawn_link, we use :gen_server.start_link. We send messages that don’t require a reply with :gen_server.cast and those that do with :gen_server.call.

We’ll see this in action soon, but first we’ll see how to create a supervisor with OTP.

An OTP Supervisor

Here’s a cache supervisor implemented with OTP’s supervisor behaviour:

Actors/cache/cache3.ex

defmodule CacheSupervisor do

def init(_args) do

workers = [worker(Cache, [])]

supervise(workers, strategy: :one_for_one)

end

end

As its name suggests, the init function is called during startup. It takes a single argument (unused in this case) and simply creates a number of workers and sets them up to be supervised. In our case, we’re creating a single Cache worker and supervising it using a one-for-one restart strategy.

Joe asks:

Joe asks:

What Is a Restart Strategy?

The OTP supervisor behaviour supports a number of different restart strategies, the two most common being one-for-one and one-for-all.

These strategies govern how a supervisor with multiple workers restarts failed workers. If a single worker fails, a supervisor using the one-for-all strategy will stop and restart all its workers (even those that didn’t fail). A supervisor using a one-for-one strategy, by contrast, will only restart the failed worker.

Other restart strategies are possible, but one of these two will suffice in the majority of cases.

As usual, we also provide an API for clients:

Actors/cache/cache3.ex

def start_link do

:supervisor.start_link(__MODULE__, [])

end

Take a moment to prove to yourself that this works in a very similar way to the cache and supervisor we implemented from scratch yesterday (I won’t show the transcript here, since it’s very similar to what we’ve already seen).

Joe asks:

Joe asks:

What Else Does OTP Do?

As we can see from the preceding code, OTP saves us from writing some boilerplate code, but there’s much more to it than just that. It’s not obvious from what we’ve already seen, but servers and supervisors implemented with OTP provide much more functionality than the simple versions we created before. Among other things, they provide the following:

Better restart logic:

The simple supervisor we wrote for ourselves has a very dumb approach to restarting its worker—if it terminates abnormally, it’s restarted. If the worker process crashed immediately on startup, this supervisor would simply restart it over and over again forever. An OTP supervisor, by contrast, has a maximum restart frequency which, if exceeded, results in the supervisor itself terminating abnormally.

Debugging and logging:

An OTP server can be started with various options to enable logging and debugging, which can be very helpful during development.

Hot code swapping:

An OTP server can be upgraded dynamically without taking the whole system down.

Lots, lots more:

Release management, failover, automated scaling …

We won’t talk further about these features here, but they’re powerful reasons to prefer OTP over handwritten code in most circumstances.

Nodes

Whenever we create an instance of the Erlang virtual machine, we create a node. So far, we’ve only created a single node. Now we’ll see how to create and connect multiple nodes.

Connecting Nodes

For one node to connect to another, they both need to be named. We name a node by starting the Erlang virtual machine with the --name or --sname options. My MacBook Pro happens to have the IP address 10.99.1.50. If I run iex --sname node1@10.99.1.50 --cookie yumyum (the --cookieargument is explained in How Do I Manage My Cluster?) on my MacBook Pro, for example, I see the name reflected in the prompt:

iex(node1@10.99.1.50)1> Node.self

:"node1@10.99.1.50"

iex(node1@10.99.1.50)2> Node.list

[]

A node can query its name with Node.self and list the other nodes it knows about with Node.list. Right now that list is empty—let’s see how to populate it. If I start another Erlang virtual machine on another machine that has the IP address 10.99.1.92 with iex --sname node2@10.99.1.92 --cookie yumyum, I can connect to it from my MacBook Pro with Node.connect:

iex(node1@10.99.1.50)3> Node.connect(:"node2@10.99.1.92")

true

iex(node1@10.99.1.50)4> Node.list

[:"node2@10.99.1.92"]

Connections are bidirectional—my other machine now also knows about my MacBook Pro:

iex(node2@10.99.1.92)1> Node.list

[:"node1@10.99.1.50"]

Joe asks:

Joe asks:

What If I Only Have One Computer?

If you only have one computer at hand and still want to experiment with clustering, you have a few options:

· Use virtual machines.

· Fire up Amazon EC2 or similar cloud instances.

· Run multiple nodes on a single computer. Although clearly this isn’t the most realistic situation, it is by far the easiest, and it allows you to sidestep firewall and network configuration issues if you’re having problems getting multiple nodes to work across machines.

Remote Execution

Now that we have two connected nodes, one can execute code on the other:

iex(node1@10.99.1.50)5> whoami = fn() -> IO.puts(Node.self) end

#Function<20.80484245 in :erl_eval.expr/5>

iex(node1@10.99.1.50)6> Node.spawn(:"node2@10.99.1.92", whoami)

#PID<8242.50.0>

node2@10.99.1.92

These deceptively simple lines of code demonstrate something amazingly powerful—not only has one node executed code on another, but the output appeared on the first node. This is because a process inherits its group leader from the process that spawned it, and (among other things) that specifies where output from IO.puts appears. That’s an awful lot going on under the hood!

Joe asks:

Joe asks:

How Do I Manage My Cluster?

A system that allows one machine to execute arbitrary code on another is extremely powerful. Like any powerful tool, it can also be dangerous. In particular, you need to pay careful attention to security when thinking about cluster management. That’s where the --cookie argument we gave to iex comes in—an Erlang node will accept connection requests only from nodes that have the same cookie. There are other approaches to securing an Erlang cluster, such as tunneling internode connections over SSL.

Security is not the only question you need to think about. In the preceding example, I chose to specify the IP address in the node name because that’s guaranteed to work on most network configurations (and I don’t know how your network is configured). But it may not (probably will not) be the best choice for production use.

Cluster design trade-offs are subtle and beyond the scope of this book. Please make sure that you read the documentation about these questions before rolling out a production cluster.

Remote Messaging

As you would expect, an actor running on one node can send messages to an actor running on another. To demonstrate, let’s spawn an instance of the Counter actor we created earlier (see the code) on one node:

iex(node2@10.99.1.92)1> pid = spawn(Counter, :loop, [42])

#PID<0.51.0>

iex(node2@10.99.1.92)2> :global.register_name(:counter, pid)

:yes

After spawning it, we register it using :global.register_name, which is similar to Process.register, except that the name is cluster-global.

We can then use :global.whereis_name on another node to retrieve the process identifier and send it messages:

iex(node1@10.99.1.50)1> Node.connect(:"node2@10.99.1.92")

true

iex(node1@10.99.1.50)2> pid = :global.whereis_name(:counter)

#PID<7856.51.0>

iex(node1@10.99.1.50)3> send(pid, {:next})

{:next}

iex(node1@10.99.1.50)4> send(pid, {:next})

{:next}

Sure enough, we see this on the first node:

Current count: 42

Current count: 43

Note that again the output appears on the node upon which the actor generating it was spawned.

Distributed Word Count

We’re going to finish off our discussion of actors and Elixir by creating a distributed version of the Wikipedia word-count example we’ve seen in previous chapters. Like the solutions we’ve already seen, this will be able to leverage multiple cores. Unlike those we’ve already seen, it will also be able to scale beyond a single machine and recover from failures.

Here’s a diagram of the architecture we’re aiming for:

images/Messages2

Our solution is divided into three types of actors: one Parser, multiple Counters, and one Accumulator. The Parser is responsible for parsing a Wikipedia dump into pages, Counters count words within pages, and the Accumulator keeps track of total word counts across pages.

Processing is kicked off by a Counter requesting a page from the Parser. When the Counter receives the page, it counts the words contained within and sends them to the Accumulator. Finally, the Accumulator lets the Parser know that the page has been processed.

We’ll discuss why we chose this particular arrangement soon, but first let’s see how it’s implemented, starting with Counter.

Counting Words

Our Counter module implements a simple stateless actor that receives pages from the Parser and delivers the resulting word counts to the Accumulator. Here it is in full:

Actors/word_count/lib/counter.ex

Line 1

defmodule Counter do

-

use GenServer.Behaviour

-

def start_link do

-

:gen_server.start_link(__MODULE__, nil, [])

5

end

-

def deliver_page(pid, ref, page) do

-

:gen_server.cast(pid, {:deliver_page, ref, page})

-

end

-

10

def init(_args) do

-

Parser.request_page(self())

-

{:ok, nil}

-

end

-

15

def handle_cast({:deliver_page, ref, page}, state) do

-

Parser.request_page(self())

-

-

words = String.split(page)

-

counts = Enum.reduce(words, HashDict.new, fn(word, counts) ->

20

Dict.update(counts, word, 1, &(&1 + 1))

-

end)

-

Accumulator.deliver_counts(ref, counts)

-

{:noreply, state}

-

end

25

end

This follows the normal pattern for an OTP server—a public API (in this case start_link and deliver_page) followed by initialization (init) and message handlers (handle_cast).

Each Counter kicks things off by calling Parser.request_page during initialization (line 11).

Each time it receives a page, a Counter starts by requesting another page (line 16—we do this first to minimize latency). It then counts the words contained within the page, building a dictionary called counts (lines 18-21). Finally, those counts are sent to the Accumulator along with the reference (ref) that was sent with the page.

Next, CounterSupervisor allows us to create and supervise multiple Counters:

Actors/word_count/lib/counter.ex

defmodule CounterSupervisor do

use Supervisor.Behaviour

def start_link(num_counters) do

:supervisor.start_link(__MODULE__, num_counters)

end

def init(num_counters) do

workers = Enum.map(1..num_counters, fn(n) ->

worker(Counter, [], id: "counter#{n}")

end)

supervise(workers, strategy: :one_for_one)

end

end

CounterSupervisor.init takes the number of counters we want to create, which we use to create a workers list of that length. Note that each worker needs to have a distinct id, which we achieve by mapping over the range 1..num_counters.

Keeping Track of Totals

The Accumulator actor maintains two elements of state: totals, a dictionary containing accumulated counts, and processed_pages, a set containing the references of all the pages that it’s processed.

Actors/word_count/lib/accumulator.ex

Line 1

defmodule Accumulator do

-

use GenServer.Behaviour

-

-

def start_link do

5

:gen_server.start_link({:global, :wc_accumulator}, __MODULE__,

-

{HashDict.new, HashSet.new}, [])

-

end

-

-

def deliver_counts(ref, counts) do

10

:gen_server.cast({:global, :wc_accumulator}, {:deliver_counts, ref, counts})

-

end

-

-

def handle_cast({:deliver_counts, ref, counts}, {totals, processed_pages}) do

-

if Set.member?(processed_pages, ref) do

15

{:noreply, {totals, processed_pages}}

-

else

-

new_totals = Dict.merge(totals, counts, fn(_k, v1, v2) -> v1 + v2 end)

-

new_processed_pages = Set.put(processed_pages, ref)

-

Parser.processed(ref)

20

{:noreply, {new_totals, new_processed_pages}}

-

end

-

end

-

end

We create a global name for our accumulator by passing {:global, wc_accumulator} to :gen_server.start_link (line 5). We can use this directly when sending messages with :gen_server.cast (line 10).

When a set of counts is delivered to the accumulator, it first checks to see if it’s already processed counts for this page (we’ll soon see why this is important and why it might receive counts twice). If it hasn’t, it merges the counts into totals with Dict.merge, the page reference intoprocessed_pages with Set.put, and notifies the parser that the page has been processed.

Parsing and Fault Tolerance

Parser is the most complex of our three types of actor, so we’ll break it down into chunks. First, here’s its public API:

Actors/word_count/lib/parser.ex

defmodule Parser do

use GenServer.Behaviour

def start_link(filename) do

:gen_server.start_link({:global, :wc_parser}, __MODULE__, filename, [])

end

def request_page(pid) do

:gen_server.cast({:global, :wc_parser}, {:request_page, pid})

end

def processed(ref) do

:gen_server.cast({:global, :wc_parser}, {:processed, ref})

end

end

As with Accumulator, Parser registers itself with a global name during startup. It supports two operations—request_page, which is called by a Counter to request a page, and processed, which is called by the Accumulator to indicate that a page has been successfully processed.

Here’s the implementation of the message handlers for these two operations:

Actors/word_count/lib/parser.ex

def init(filename) do

xml_parser = Pages.start_link(filename)

{:ok, {ListDict.new, xml_parser}}

end

def handle_cast({:request_page, pid}, {pending, xml_parser}) do

new_pending = deliver_page(pid, pending, Pages.next(xml_parser))

{:noreply, {new_pending, xml_parser}}

end

def handle_cast({:processed, ref}, {pending, xml_parser}) do

new_pending = Dict.delete(pending, ref)

{:noreply, {new_pending, xml_parser}}

end

Parser maintains two items of state: pending, which is a ListDict of references for pages that have been sent to a Counter but not yet processed, and xml_parser, which is an actor that uses the Erlang xmerl library to parse a Wikipedia dump (we won’t show its implementation here—see the code that accompanies this book if you’re interested).[28]

Handling a :processed message simply requires deleting the processed page from pending. Handling a :request_page message involves retrieving the next available page from the XML parser and passing it to deliver_page:

Actors/word_count/lib/parser.ex

defp deliver_page(pid, pending, page) when nil?(page) do

if Enum.empty?(pending) do

pending # Nothing to do

else

{ref, prev_page} = List.last(pending)

Counter.deliver_page(pid, ref, prev_page)

Dict.put(Dict.delete(pending, ref), ref, prev_page)

end

end

defp deliver_page(pid, pending, page) do

ref = make_ref()

Counter.deliver_page(pid, ref, page)

Dict.put(pending, ref, page)

end

The implementation of deliver_page uses an Elixir feature we’ve not seen before—a guard clause specified by the when in the first deliver_page clause. A guard clause is a Boolean expression—the function clause matches only if the guard is true.

Let’s consider the case when page is non-nil first. In this case, we create a new unique reference with make_ref, deliver the page to the counter that requested it, and add the page to our pending dictionary.

If page is nil, that indicates that the XML parser has finished parsing the Wikipedia dump. In that case, we remove the oldest entry from pending and send it, and remove and re-add it to pending so that it’s now the youngest entry.

Why this second case? Surely every pending batch will eventually be processed. What do we gain by sending it to another Counter?

The Big Win

What we gain is fault tolerance. If a Counter exits or the network goes down or the machine it’s running on dies, we’ll just end up sending the page it was processing to another Counter. Because each page has a reference associated with it, we know which pages have been processed and won’t double-count.

To convince yourself, try starting a cluster. On one machine, start a Parser and an Accumulator. On one or more other machines, start a number of Counters. If you pull the network cable out the back of a machine running counters, or kill the virtual machine they’re running in, the remaining counters will continue to process pages, including those that were in progress on that machine.

This is a great example of the benefits of concurrent, distributed development. This program will hardly miss a beat when faced with a hardware failure that would kill a normal sequential or multithreaded program.

Day 3 Wrap-Up

That brings us to the end of day 3 and our discussion of actors.

What We Learned in Day 3

Elixir allows us to create clusters of nodes. An actor on one node can send messages to an actor running on another in exactly the same way as it can to one running locally. As well as allowing us to create systems that leverage multiple distributed computers, it allows us to recover from the failure of one of those computers.

Day 3 Self-Study

Find

· Joe Armstrong’s Lambda Jam presentation, “Systems That Run Forever Self-Heal and Scale.”

· What is an OTP application? Why might one be more accurately described as a component?

· So far, the state of every actor we’ve created has been lost if that actor dies. What support does Elixir provide for persistent state?

Do

· The fault-tolerant word-count program we developed can handle failure of a counter or the machine that it’s running on, but not the parser or accumulator. Create a version that can handle failure of any actor or node.

Wrap-Up

Alan Kay, the designer of Smalltalk and father of object-oriented programming, had this to say on the essence of object orientation:[29]

I’m sorry that I long ago coined the term “objects” for this topic because it gets many people to focus on the lesser idea.

The big idea is “messaging” … The Japanese have a small word—ma—for “that which is in-between”—perhaps the nearest English equivalent is “interstitial.” The key in making great and growable systems is much more to design how its modules communicate rather than what their internal properties and behaviors should be.

This captures the essence of actor programming very well—we can think of actors as the logical extension of object-oriented programming to the concurrent world. Indeed, you can think of actors as more object-oriented than objects, with stricter message passing and encapsulation.

Strengths

Actors have a number of features that make them ideal for solving a wide range of concurrent problems.

Messaging and Encapsulation

Actors do not share state and, although they run concurrently with each other, within a single actor everything is sequential. This means that we need only worry about concurrency when considering message flows between actors.

This is a huge boon to the developer. An actor can be tested in isolation and, as long as our tests accurately represent the types of messages that might be delivered and in what order, we can have high confidence that it behaves as it should. And if we do find ourselves faced with a concurrency-related bug, we know where to look—the message flows between actors.

Fault Tolerance

Fault tolerance is built into actor programs from the outset. This enables not only more resilient programs but also simpler and clearer code (through the “let it crash” philosophy).

Distributed Programming

Actors’ support for both shared and distributed-memory architectures brings a number of significant advantages:

Firstly, it allows an actor program to scale to solve problems of almost any size. We are not limited to problems that fit on a single system.

Secondly, it allows us to address problems where geographical distribution is an intrinsic consideration. Actors are an excellent choice for programs where different elements of the software need to reside in different geographical locations.

Finally, distribution is a key enabler for resilient and fault-tolerant systems.

Weaknesses

Although a program constructed with actors is easier to debug than one constructed with threads and locks, actors are still susceptible to problems like deadlock plus a few failure modes unique to actors (such as overflowing an actor’s mailbox).

As with threads and locks, actors provide no direct support for parallelism. Parallel solutions need to be built from concurrent building blocks, raising the specter of nondeterminism. And because actors do not share state and can only communicate through message passing, they are not a suitable choice if you need fine-grained parallelism.

Other Languages

As with most good ideas, the actor model is not new—it was first described in the 1970s, most notably by Carl Hewitt. The language that has done most to popularize actor programming, however, is unquestionably Erlang. For example, Erlang’s creator, Joe Armstrong, is the originator of the “let it crash” philosophy.

Most popular programming languages now have an actor library available; in particular the Akka toolkit can be used to add actor support to Java or any other JVM-based language.[30] If you’re interested in learning more about Akka, see the online bonus chapter,[31] which describes actor programming in Scala.

Final Thoughts

Actor programming is one of the most widely applicable programming models out there—not only does it provide support for concurrency, but it also provides distribution, error detection, and fault tolerance. As such, it’s a good fit for the kinds of programming problems we find ourselves faced with in today’s increasingly distributed world.

In the next chapter we’ll look at communicating sequential processes. Although CSP has surface similarities with actors, its emphasis on the channels used for communication, rather than the entities between which communication takes place, leads to it having a very different flavor.

Footnotes

[25]

http://www.erlang.org/

[26]

http://elixir-lang.org/

[27]

http://zoo.cs.yale.edu/classes/cs422/2011/bib/hoare81emperor.pdf

[28]

http://www.erlang.org/doc/apps/xmerl/

[29]

http://c2.com/cgi/wiki?AlanKayOnMessaging

[30]

http://akka.io

[31]

http://media.pragprog.com/titles/pb7con/Bonus_Chapter.pdf