Asynchronous and Parallel Programming - The Book of F#: Breaking Free with Managed Functional Programming (2014)

The Book of F#: Breaking Free with Managed Functional Programming (2014)

Chapter 11. Asynchronous and Parallel Programming

For most of computing’s history, software developers have been spoiled by processor manufacturers that were constantly pushing the limits of their chips’ clock speeds. If you needed your software to run faster (to process larger data sets, or because users were complaining about the system freezing when it was really just busy), often all you had to do was upgrade to the latest chip. Over the past decade or so something changed: Processor manufacturers began improving processor performance not by increasing clock speeds but by adding processing cores.

Although processor architecture has changed, software architecture has largely remained static. Multicore processors have become the norm, yet many applications are still written as though only one core is available to them and thus are not taking full advantage of the underlying hardware. Long-running tasks are still being executed on the UI thread, and large data sets are often processed synchronously. A big reason for this is that, traditionally, asynchronous and parallel programming have been sufficiently complex and error prone that they were typically the domain of expert developers working on highly specialized software.

Fortunately, software is starting to catch up. Programmers are learning that the days of solving performance issues by throwing faster hardware at the problem have passed and that it’s increasingly important to consider concurrent processing needs at an architectural level.

Although they’re closely related, asynchronous and parallel programming have different goals. Asynchronous programming aims to separate processing and reduce blocking so that longer-running tasks don’t prevent the system from completing other tasks within the same process. By contrast, parallel processing aims to improve performance by partitioning work into chunks that can be distributed across processors and operated against independently.

Since its inception, the .NET Framework has supported both asynchronous and parallel programming through threads and a multitude of synchronization mechanisms such as monitors, mutexes, semaphores, and so on. The Asynchronous Programming Model (APM), where classes defineBeginX and EndX methods for operations that should be run asynchronously (such as the BeginRead and EndRead methods on the System.IO.FileStream class) has long been the preferred approach to asynchronous programming in .NET.

In this chapter, we’ll explore several ways that F# makes asynchronous and parallel programming more accessible, thereby freeing you to focus on creating correct solutions. We’ll begin with a brief introduction to the Task Parallel Library. Next, we’ll discuss another F# construct: asynchronous workflows. Finally, we’ll conclude with an introduction to the MailboxProcessor, F#’s agent-based model for asynchronous programming.

Task Parallel Library

As its name implies, the Task Parallel Library (TPL) excels at handling parallel programming scenarios and is the preferred mechanism for CPU-bound operations. It abstracts much of the complexity of managing threads, locks, callbacks, cancellations, and exception handling behind a uniform interface. Although the TPL is not specific to F#, a basic understanding of it is helpful especially if you need to interact with code from libraries that use it.

The TPL enables two types of parallelism: data parallelism and task parallelism.

§ Data parallelism. Involves performing a specific action against each value in a sequence by distributing the work effectively across available processing resources. Under the data parallelism model, you specify a sequence along with an action and the TPL determines how to partition the data and distributes the work accordingly.

§ Task parallelism. Focuses on executing independent tasks concurrently. With task parallelism, you are responsible for manually creating and managing tasks, but this model offers you more control. Through the various Task classes, you can easily initiate asynchronous processing, wait for tasks to complete, return values, set up continuations, or spawn additional tasks.

NOTE

This section is not intended to be a comprehensive guide to the TPL. Thus, it won’t get into many of the intricacies of task creation, scheduling, management, or other associated topics. The intention here is to establish a baseline, providing you with enough information to make you immediately productive when writing code using the TPL.

Potential Parallelism

One of the key differences between working directly with threads and using the TPL is that the TPL is task based rather than thread based. This difference is quite important in that the TPL tries to run tasks concurrently by pulling threads from the thread pool, but it does not guarantee parallelism. This is known as potential parallelism.

Whenever you create a thread directly, you incur the overhead of allocating and scheduling it. This overhead can be detrimental to overall system performance if there aren’t enough system resources available to support it. The basic concurrency mechanisms, like thread pooling, help reduce the impact by reusing existing threads, but the TPL goes a step further by taking available system resources into account. If there aren’t sufficient resources available or the TPL otherwise determines that running a task in parallel will be detrimental to performance, it will run the task synchronously. As resources fluctuate over time, the TPL’s task scheduling and work partitioning algorithms help rebalance work to use the available resources effectively.

Data Parallelism

Data parallelism is achieved primarily through the use of the static For and ForEach methods of the Parallel class located in the System.Threading.Tasks namespace. As their names imply, these methods are essentially parallel versions of the simple and enumerable for loops, respectively.

NOTE

Data parallelism can also be achieved through PLINQ’s (Parallel LINQ) AsParallel extension method. To simplify working with parallel sequences in F#, the PSeq module in the F# PowerPack exposes many of the ParallelEnumerable methods using the same nomenclature as the Seq module.

For normal usage, Parallel.For and Parallel.ForEach differ only by their input; Parallel.For accepts range boundaries, whereas Parallel.ForEach accepts a sequence. Both methods also accept a function that serves as the loop body, and they implicitly wait for all iterations to complete before returning control to the caller. Since the methods are so similar, the examples in this section will use Parallel.For for consistency.

The simplest form, the parallel for loop, simply invokes an action for each value in the range. Here, we use a parallel for loop to write the numbers 0 through 99.

open System

open System.Threading.Tasks

Parallel.For(0, 100, printfn "%i")

This snippet is pretty self-explanatory. The first argument passed to Parallel.For identifies the inclusive beginning of the range, while the second identifies the exclusive end of the range. The third argument is a function that writes a number to the console.

Locking and Lock Avoidance

Now that we’re dealing with concurrency, there’s a subtle bug in the previous example. Internally, printfn incrementally sends its text to System.Console.Out as it parses the pattern. Hence, it’s possible that as each parallel iteration executes, multiple calls to printfn will be invoked simultaneously, resulting in some items being interlaced.

NOTE

The example used for this discussion is less of an issue in F# 3.1, where printf and its related functions have been improved such that they run up to 40 times faster than in previous releases.

We can address this problem a few ways. One approach is to control access to System.Console.Out with the lock operator. The lock operator serves the same purpose as the lock statement in C# (SyncLock in Visual Basic) in that it prevents additional threads from executing a block of code until the locked resource is freed. Here is the previous example reworked to use locking:

Parallel.For(0, 100, fun n -> lock Console.Out (fun () -> printfn "%i" n))

There are times when locking is appropriate, but using it like this is a horrible idea. By locking, we negate most of the benefits of parallelizing the loop because only one item can be written at a time! Instead, we want to try another approach that avoids locking and doesn’t interlace the results.

One of the easiest ways to achieve a satisfactory result is with function composition. Here, we use the sprint function to format the number and send that result to Console.WriteLine:

Parallel.For(0, 100, (sprintf "%i") >> Console.WriteLine)

This approach works because each call to sprintf writes to an isolated StringBuilder rather than a shared TextWriter. This eliminates the need to lock, thereby eliminating a potential bottleneck in your application.

Short-Circuiting Parallel Loops

Unlike F#’s built-in for loops, parallel loops provide some short-circuiting mechanisms by means of the ParallelLoopState class’s Break and Stop methods. The TPL handles creating and managing the loop state, so all you need to do to access either of these methods is use one of the overloads that exposes it. Consider the following shortCircuitExample function:

open System.Collections.Concurrent

open System.Threading.Tasks

let shortCircuitExample shortCircuit =

let bag = ConcurrentBag<_>()

Parallel.For(

0,

999999,

① fun i s -> if i < 10000 then bag.Add i else shortCircuit s) |> ignore

(bag, bag.Count)

Like the previous examples, the shortCircuitExample function uses Parallel.For, but notice at ① that the supplied function accepts two parameters instead of one. The second parameter, s, is the loop state.

With shortCircuitExample in place we can now invoke it, passing a function that accepts a ParallelLoopState instance and calls either Stop or Break, like this:

shortCircuitExample (fun s -> s.Stop()) |> printfn "%A"

shortCircuitExample (fun s -> s.Break()) |> printfn "%A"

Both of the preceding lines will force the parallel loop to terminate before all iterations complete, but they have very different effects. Stop causes the loop to terminate at its earliest convenience but allows any iterations that are executing to continue. Break, on the other hand, causes the loop to terminate at its earliest convenience after the current iteration. You also need to take care that you do not call Stop and Break in succession to avoid an InvalidOperationException.

The difference between these two methods can be drastic. For example, in one run on my desktop, the Break version processed 10,000 items, whereas the Stop version processed only 975.

Cancelling Parallel Loops

Cancelling a parallel for loop is similar to short-circuiting, except that instead of using the Stop or Break methods to terminate the loop from within, you identify an external cancellation token that the loop monitors and responds to. Unlike the short-circuiting mechanism, cancellation forces all tasks configured with the same token to stop. Cancelling does raise an OperationCanceledException, so you’ll want to handle that accordingly.

The following function demonstrates cancelling a parallel for loop:

open System

open System.Threading.Tasks

let parallelForWithCancellation (wait : int) =

use tokenSource = new ① System.Threading.CancellationTokenSource(wait)

try

Parallel.For(

0,

Int32.MaxValue,

② ParallelOptions(③ CancellationToken = ④ tokenSource.Token),

fun (i : int) -> Console.WriteLine i

) |> ignore

with

| :? ⑤ OperationCanceledException -> printfn "Cancelled!"

| ex -> printfn "%O" ex

In the preceding code, we create a CancellationTokenSource at ①. This object is initialized to automatically cancel after a provided number of milliseconds. Inside the try block, we use an overload of Parallel.For that accepts a ParallelOptions instance as shown at ②. Through this ParallelOptions instance, we initialize the CancellationToken property ③ to the token exposed by the CancellationTokenSource ④. When the token source’s internal timer expires, the parallel loop raises an exception, which is then caught and handled at ⑤.Although we relied on a CancellationTokenSource that automatically cancelled, you can manually force cancellation by calling the Cancel method, typically from another task or thread.

Task Parallelism

Task parallelism gives you the most control over executing code in parallel while still abstracting many of the implementation details from you.

Creating and Starting Tasks

Tasks can be created and started in several ways. The easiest, but least flexible, way is the Parallel.Invoke method, which accepts one or more functions to execute concurrently and implicitly waits for them to finish, like this:

open System

open System.Threading.Tasks

Parallel.Invoke(

(fun () -> printfn "Task 1"),

(fun () -> Task.Delay(100).Wait()

printfn "Task 2"),

(fun () -> printfn "Task 3")

)

printfn "Done"

Here, Parallel.Invoke creates and starts three independent tasks. The first and third tasks simply print a message, while the second task waits 100 milliseconds before printing its message.

Parallel.Invoke limits what you can do because it doesn’t expose any information about the individual tasks, nor does it provide any feedback about whether the tasks succeeded or failed. You can catch and handle exceptions raised by the tasks and cancel them by providing a cancellation token (similar to the approach used in Cancelling Parallel Loops), but that’s about it. When you want to do anything more advanced with tasks, you’ll need to create them manually.

There are two ways to create tasks manually: directly via a constructor, or through a TaskFactory. For our purposes, the primary difference between the two approaches is that when creating tasks with the constructor you must manually start them. Microsoft recommends favoring theTaskFactory when task creation and scheduling don’t need to be separated.

To create a new task with the Task constructor, you need only provide a function that serves as the task’s body, like this:

open System.Threading.Tasks

let t = new Task(fun () -> printfn "Manual Task")

This creates a new task that prints a string. To start the task, call its Start method.

t.Start()

Alternatively, you can combine the two steps into one with a TaskFactory. Conveniently, the Task class has a static Factory property that is preset to a default TaskFactory, so you don’t need to create one on your own. Here, we create and start a task using the default factory’sStartNew method:

open System.Threading.Tasks

let t = Task.Factory.StartNew(fun () -> printfn "Factory Task")

Returning Values from Tasks

The tasks we’ve looked at so far simply invoke an action, but you also need to know how to return a value—a commonly needed but cumbersome process under traditional asynchronous models. The TPL makes returning values trivial through a generic Task<'T> class, where 'T represents the task’s return type.

WARNING

The random-number generation used in the following examples is sufficient for demonstration purposes, but be aware that the System.Random class is not thread-safe and even creating a new instance per task may not be sufficient. Should your solution require a more robust approach to concurrently generating random numbers, I recommend reading Stephen Toub’s article on the subject at http://blogs.msdn.com/b/pfxteam/archive/2009/02/19/9434171.aspx.

Creating tasks that return values is almost identical to the basic tasks we’ve already looked at. The Task<'T> class provides a set of constructor overloads that are comparable to that of the non-generic Task class, and the TaskFactory includes a generic overload of StartNew. To demonstrate, let’s use StartNew<'T> to create and run a task that returns a random number.

let t = Task.Factory.StartNew(fun () -> System.Random().Next())

The only truly notable thing about this example is that the function passed to StartNew returns an integer and the generic overload is inferred. Of course, returning a value doesn’t do much good without a way to access that value, and that’s why Task<'T> provides the Result property, which will contain the return value when the task completes. Here, we see how to access the return value:

t.Result |> printfn "Result: %i"

Because this is an asynchronous operation, there’s no guarantee that the task has completed executing before the Result property is accessed. For this reason, Result’s get accessor checks whether the task has completed and waits for it to complete if necessary before returning its result. It’s more typical to access the result as part of a continuation (as shown a bit later in this chapter) than immediately after the task starts.

Waiting for Task Completion

When your program depends on one or more tasks completing before it can continue processing, you can wait for those tasks using one of the wait mechanisms. For convenience, the examples in this section will use the following function, which returns a new function that sleeps for a random amount of time (simulating a long-running operation lasting up to delayMs) before printing a message:

let randomWait (delayMs : int) (msg : string) =

fun () -> (System.Random().Next delayMs |> Task.Delay).Wait()

Console.WriteLine msg

We can use the TaskFactory to create a task and wait for it to complete with the task’s Wait method like this:

let waitTask = Task.Factory.StartNew(randomWait 1000 "Task Finished")

waitTask.Wait()

printfn "Done Waiting"

In this code, a new task is created and started, but the message “Done Waiting” won’t be written to the console until it completes due to the explicit wait. This can be helpful when subsequent code is dependent upon the task’s completion.

You’ll often want to run a number of tasks in parallel and block until one completes. To do so, you can use the static WaitAny method from the Task class. The most basic WaitAny overload accepts a params array of tasks and will stop blocking as soon as one of the tasks in the array completes. Here, we pass three started tasks to WaitAny:

Task.WaitAny(

Task.Factory.StartNew(randomWait 2000 "Task 0 Finished"),

Task.Factory.StartNew(randomWait 2000 "Task 1 Finished"),

Task.Factory.StartNew(randomWait 2000 "Task 2 Finished"))

Console.WriteLine "Done Waiting"

When any of the three tasks complete, WaitAny will stop blocking, thus allowing execution to continue to the Console.WriteLine call. Note that WaitAny doesn’t kill the remaining tasks when it unblocks, so they’ll continue executing in parallel with the source thread.

Similar to WaitAny, the Task class provides a static WaitAll method. WaitAll also accepts a params array of tasks, but instead of allowing execution to continue when one task completes, WaitAll unblocks only when all of the tasks have completed. Because the code differs only by which method is called, I haven’t included a sample, but I encourage you to experiment with each. As you do so, run each form several times and observe the differences.

Continuations

Traditionally, whenever you wanted to execute some code as soon as some parallel or asynchronous code completed, you needed to pass a function, called a callback, to the asynchronous code. In .NET, callbacks have typically been implemented through the built-in AsyncCallbackdelegate type.

Using callbacks is effective, but they can complicate the code and be tricky to maintain. The TPL greatly simplifies this process with continuations, which are tasks configured to start when one or more tasks, called antecedents, complete.

The simplest continuations are created from individual tasks. Let’s start by creating a task that will serve as an antecedent:

let antecedent =

new Task<string>(

fun () ->

Console.WriteLine("Started antecedent")

System.Threading.Thread.Sleep(1000)

Console.WriteLine("Completed antecedent")

"Job's done")

Now that we have a task, we can set up a continuation by passing a function to the task’s ContinueWith method, like so:

let continuation =

antecedent.ContinueWith(

fun ① (a : Task<string>) ->

Console.WriteLine("Started continuation")

Console.WriteLine("Antecedent status: {0}", a.Status)

Console.WriteLine("Antecedent result: {0}", a.Result)

Console.WriteLine("Completed continuation"))

As you can see, creating a continuation is very similar to creating a regular task, but notice at ① how the function passed to the ContinueWith method accepts a parameter of type Task<string>. This parameter represents the antecedent so that the continuation can branch according to the antecedent’s status (for example, RanToCompletion, Faulted, Canceled, and so on) or its result if it has one.

At this point, neither task has been started, so we’ll start antecedent. When it completes, the TPL will automatically start continuation. We can observe this behavior as follows:

antecedent.Start()

Console.WriteLine("Waiting for continuation")

continuation.Wait()

Console.WriteLine("Done")

which should print the following messages:

Waiting for continuation

Started antecedent

Completed antecedent

Started continuation

Antecedent status: RanToCompletion

Completed continuation

Done

The ContinueWith method is useful when you’re dealing with only a single task. When you have multiple tasks, you can turn to the TaskFactory’s ContinueWhenAny or ContinueWhenAll methods. Like their WaitAny and WaitAll counterparts, the ContinueWhenAny andContinueWhenAll methods will start the continuation task when any or all of the tasks in an array complete, respectively. For brevity we’ll focus on the ContinueWhenAll method.

let antecedents =

[|

new Task(

fun () ->

Console.WriteLine("Started first antecedent")

System.Threading.Thread.Sleep(1000)

Console.WriteLine("Completed first antecedent"))

new Task(

fun () ->

Console.WriteLine("Started second antecedent")

System.Threading.Thread.Sleep(1250)

Console.WriteLine("Completed second antecedent"))

new Task(

fun () ->

Console.WriteLine("Started third antecedent")

System.Threading.Thread.Sleep(1000)

Console.WriteLine("Completed third antecedent"))

|]

let continuation =

① Task.Factory.ContinueWhenAll(

antecedents,

fun ② (a : Task array) ->

Console.WriteLine("Started continuation")

for x in a do Console.WriteLine("Antecedent status: {0}", x.Status)

Console.WriteLine("Completed continuation"))

for a in antecedents do a.Start()

Console.WriteLine("Waiting for continuation")

continuation.Wait()

Console.WriteLine("Done")

ContinueWhenAny follows the same pattern as WaitAny. Here we’ve defined three tasks, which we manually start after creating the continuation at ①.Notice the continuation task’s parameter at ②. Instead of receiving a single antecedent task as you would with ContinueWith orContinueWhenAny, continuations created with ContinueWhenAll accept an array of tasks. This array contains all of the tasks supplied to ContinueWhenAll instead of the individual task that caused the continuation to start. This allows you to inspect each antecedent and handle success and failure scenarios as granularly as you need.

Cancelling Tasks

Cancelling a task is fundamentally the same as cancelling a parallel for loop, but it requires a bit more work because the parallel for loops handle the cancellation details for you. The following function demonstrates cancelling a task and follows the typical pattern for handling the cancellation:

let taskWithCancellation (cancelDelay : int) (taskDelay : int) =

① use tokenSource = new System.Threading.CancellationTokenSource(cancelDelay)

② let token = tokenSource.Token

try

let t =

Task.Factory.StartNew(

(fun () ->

③ token.ThrowIfCancellationRequested()

printfn "passed cancellation check; waiting"

System.Threading.Thread.Sleep taskDelay

④ token.ThrowIfCancellationRequested()),

token)

⑤ t.Wait()

with

| ex -> printfn "%O" ex

printfn "Done"

As with cancelling parallel for loops, we start by creating a CancellationTokenSource at ①. For convenience, we then bind the token to a name at ② so we can reference it within the function the task is based upon. Within the task body, the first thing we do at ③ is call the token’sThrowIfCancellationRequested method, which interrogates the token’s Is CancellationRequested property and throws an OperationCanceledException if that property returns true. We do this to ensure that no unnecessary work is performed if cancellation was requested when the task was started. When no exception is thrown, execution continues. At ④ we again check for cancellation to avoid a successful task completion. Finally, at ⑤ we wait for the task to complete so we can handle any exceptions thrown by the task.

Exception Handling

Exceptions can be raised by any number of executing tasks at any time. When this happens, we need a way to capture and handle them. In the previous section, we handled the exception in a general manner—by matching any exception and writing it to the console. If you executed thetaskWithCancellation function, you may have noticed that the exception we caught wasn’t an OperationCanceledException but rather an AggregateException that included an OperationCanceledException. The base exception classes aren’t well suited for parallel scenarios because they represent only a single failure. To compensate, a new exception type, AggregateException, was introduced to allow us to report one or more failures within a single construct.

Although you certainly could handle an AggregateException directly, you’ll typically want to find a specific exception within it. For this, the AggregateException class provides the Handle method, which iterates over the exceptions contained within its InnerExceptionscollection so you can find the exception you really care about and handle it accordingly.

try

raise (AggregateException(

NotSupportedException(),

ArgumentException(),

AggregateException(

ArgumentNullException(),

NotImplementedException())))

with

| :? AggregateException as ex ->

ex.Handle(

① Func<_, _>(

function

② | :? AggregateException as ex1 ->

③ ex1.Handle(

Func<_, _>(

function

| :? NotImplementedException as ex2 -> printfn "%O" ex2; true

| _ -> true))

true

| _ -> true))

Handling an AggregateException follows the familiar exception-handling pattern: We match against the AggregateException and bind it to the name ex as you might expect. Inside the handler, we invoke the Handle method ①, which accepts a Func<exn, bool> indicating that the supplied function accepts an exception, and return a Boolean value. (To use pattern-matching functions as we’ve done here, we explicitly construct Func<_, _> instances and allow the compiler to infer the proper type arguments.) Inside the pattern-matching function ②, we detect whether we have a nested AggregateException and handle it at ③. At each level, we need to return a Boolean value indicating whether the particular exception was handled. If we return false for any exception, a new AggregateException which contains the unhandled exception will be raised.

Handling AggregateExceptions like this can get quite cumbersome, complex, and tedious. Fortunately, AggregateException provides another method, Flatten, which simplifies error handling by iterating over the InnerExceptions collection and recursing over each nestedAggregateException to construct a new AggregateException instance that directly contains all of the exceptions within the source exception’s hierarchy. For example, we can revise the previous example to use Flatten to simplify the handler, like this:

try

raise (AggregateException(

NotSupportedException(),

ArgumentException(),

AggregateException(

ArgumentNullException(),

NotImplementedException())))

with

| :? AggregateException as ex ->

ex.Flatten().Handle(

Func<_, _>(

function

| :? NotImplementedException as ex2 -> printfn "%O" ex2; true

| _ -> true))

In this revised example, we call Handle against the flattened AggregateException. With only one level to process, we can omit the checks for nested AggregateExceptions and handle the NotImplementedException directly.

Asynchronous workflows

Despite the many improvements that the TPL brings to asynchronous and parallel programming, F# offers its own model, which better matches the functional paradigm emphasized by the language. While it’s sometimes desirable to use the TPL in F# (particularly when working across language boundaries) you’ll often turn to F#’s asynchronous workflows, which are best suited for I/O-based operations.

Asynchronous workflows provide a uniform and idiomatic way to compose and execute asynchronous code against the thread pool. Furthermore, their very nature often makes it difficult (if not impossible) to fall into some of the asynchronous traps present even in the TPL.

NOTE

Like our TPL discussion, this section is intended to give you a basic working knowledge of asynchronous workflows rather than serving as a comprehensive guide.

Creating and Starting Asynchronous Workflows

Asynchronous workflows are based on the Async<'T> class that resides in the Microsoft.FSharp.Control namespace. This type represents a bit of code you want to run asynchronously, ultimately returning some value. Instead of creating Async<'T> instances directly, though, we compose them through async expressions much like we compose sequences or queries.

Async expressions take the following form:

async { async-expressions }

Here, async-expressions represents one or more expressions that will participate in the asynchronous operation. In addition to the standard expressions we’ve seen throughout this book, asynchronous workflows allow you to easily invoke additional workflows and wait for results without blocking through specialized variants of some familiar keywords such as let and use. For instance, the let! keyword invokes an asynchronous workflow and binds the result to a name. Similarly, the use! keyword invokes an asynchronous workflow that returns a disposable object, binds the result to a name, and disposes of the object when it goes out of scope. It’s also possible to invoke an asynchronous workflow and immediately return the result with the return! keyword.

To demonstrate, we’ll turn to the “hello world” example of asynchronous workflows: requesting multiple web pages. To begin, let’s define some functions that encapsulate the logic needed to create an asynchronous page request (note that a similar function, Http.AsyncRequestString, is available in the FSharp.Data framework):

open System

open System.IO

open System.Net

type StreamReader with

member x.AsyncReadToEnd () =

async { do! Async.SwitchToNewThread()

let content = x.ReadToEnd()

do! Async.SwitchToThreadPool()

return content }

let getPage (uri : Uri) =

async {

let req = WebRequest.Create uri

use! response = req.AsyncGetResponse()

use stream = response.GetResponseStream()

use reader = new StreamReader(stream)

return! reader.AsyncReadToEnd()

}

After opening the relevant namespaces, we extend the StreamReader class with a single AsyncReadToEnd method. This method, adapted from the F# PowerPack, is similar to the existing ReadToEndAsync method except that instead of using the TPL, it returns an asynchronous workflow that we can evaluate as the final step of the larger workflow in the getPage function where we describe how to make the page request. The overall flow of the expression is pretty standard: Create a WebRequest, wait for the response, and then explicitly return the response stream’s contents.

NOTE

The AsyncGetResponseMethod is an extension method defined in the F# core library. It conveniently wraps the standard .NET code within another asynchronous workflow, which makes it possible to employ use! and greatly simplifies the code.

It’s important to recognize that getPage doesn’t actually execute the request; it merely creates an instance of Async<string> that represents the request. This allows us to define multiple requests or pass them around to other functions. We can even execute the request multiple times. To execute the request we need to turn to the static Async class, which you can think of as a controller for asynchronous workflows.

There are a number of methods for starting an asynchronous workflow. Some common methods are listed in Table 11-1.

Table 11-1. Common Async Start Methods

Method

Description

RunSynchronously

Starts an asynchronous workflow and waits for its result.

Start

Starts an asynchronous workflow but does not wait for a result.

StartImmediate

Starts an asynchronous workflow immediately using the current thread. Useful for UI updates.

StartWithContinuations

Immediately starts an asynchronous workflow using the current thread, invoking a success, exception, or cancellation continuation depending on how the operation completed.

The method you choose is largely dependent upon what the workflow does, but you’ll typically use Start unless your application requires one of the others. The workflow created by the getPage function returns the result of a web request. Since we’re making the request, we probably don’t want to ignore the result, so we’ll need to wire up a continuation to do something with it. The easiest way to do that is to wrap the call to getPage inside another asynchronous expression, passing the result to another function when it completes, and starting the entire workflow withStart. Here, we call getPage and print the result:

async {

let! content = Uri "http://nostarch.com" |> getPage

content.Substring(0, 50) |> printfn "%s" }

|> Async.Start

USING ASYNC

The fact that Async is a static class rather than a module has ramifications for how you interact with it. Rather than providing let-bound functions as a module would, Async provides methods, many of which are overloaded primarily to aid in cancellation. Furthermore, Async’s methods are typically designed with a more object-oriented approach than is typical in the core F# libraries. Accordingly, their parameters are often tupled, making it difficult to use them with pipelining.

Alternatively, we can use the StartWithContinuations method, which accepts an asynchronous workflow and three functions to invoke when the workflow finishes successfully, raises an exception, or is cancelled, respectively. The following code shows such an approach:

Async.StartWithContinuations(

① getPage(Uri "http://nostarch.com"),

② (fun c -> c.Substring(0, 50) |> printfn "%s..."),

③ (printfn "Exception: %O"),

④ (fun _ -> printfn "Cancelled")

)

When the asynchronous operation ① completes successfully, the success continuation ② is invoked and the first 50 characters from the page source will be printed. Should the operation throw an exception, the exception continuation ③ will execute and print the exception. Finally, if the operation is cancelled, as described in Cancelling Asynchronous Workflows, the cancellation continuation ④ will execute and display a note informing the user of the cancellation.

Instead of relying on continuations, we can use the RunSynchronously method and get the result directly, like this:

let html =

Uri "http://nostarch.com"

|> getPage

|> Async.RunSynchronously

Of course, running a single asynchronous workflow like this defeats the purpose of running it asynchronously because RunSynchronously waits for the result. Instead, RunSynchronously is often used in conjunction with Async.Parallel to run multiple workflows in parallel and wait for all of them to complete. For instance, we can make multiple requests, starting with an array of asynchronous workflows, as follows:

open System.Text.RegularExpressions

[| getPage(Uri "http://nostarch.com")

getPage(Uri "http://microsoft.com")

getPage(Uri "http://fsharp.org") |]

|> Async.Parallel

|> Async.RunSynchronously

|> Seq.iter (fun c -> let sample = c.Substring(0, 50)

Regex.Replace(sample, @"[\r\n]| {2,}", "")

|> printfn "%s...")

Here, we employ the Parallel method to combine each of the asynchronous workflows into a single workflow that is then piped to the RunSynchronously method. When each of the requests has completed, we iterate over the resulting array, stripping a few characters from the content for readability and printing the result.

Cancelling Asynchronous Workflows

In the previous section I indicated that asynchronous workflows can be cancelled. Just as in the TPL, asynchronous workflows use cancellation tokens to control cancellation. It’s possible, and sometimes even necessary, to manage tokens on your own, but in many cases you can rely on theAsync class’s default token.

For simple scenarios, such as when you’re starting a single work-flow via the Start or StartWithContinuations methods, you can use the CancelDefaultToken method to cancel the workflow, like this:

① Async.StartWithContinuations(

getPage(Uri "http://nostarch.com"),

(fun c -> c.Substring(0, 50) |> printfn "%s..."),

(printfn "Exception: %O"),

(fun _ -> printfn "Cancelled")

)

② Async.CancelDefaultToken()

The StartWithContinuations method ① monitors the default token and cancels the workflow when the token is marked as cancelled via the CancelDefaultToken method ②. In this example, because the workflow is cancelled before it completes, the cancellation callback is invoked instead of the success callback, resulting in the cancellation message being displayed.

The TryCancelled method, which accepts a workflow and a function that will be invoked when cancellation is requested, is a nice alternative for work-flows that don’t return a value. Here, the displayPartialPage function wraps a call to getPage within another asynchronous workflow. The outer workflow waits for the response and writes out the first 50 characters when the message is received. Because TryCancelled returns yet another workflow and doesn’t automatically start it, we need to explicitly do so with a call to Start.

let displayPartialPage uri =

Async.TryCancelled(

async {

let! c = getPage uri

Regex.Replace(c.Substring(0, 50), @"[\r\n]| {2,}", "")

|> sprintf "[%O] %s..." uri

|> Console.WriteLine },

(sprintf "[%O] Cancelled: %O" uri >> Console.WriteLine))

Async.Start(displayPartialPage (Uri "http://nostarch.com"))

Async.CancelDefaultToken()

The default token is often sufficient for cancelling workflows. When you’re executing multiple workflows and want to coordinate cancellation or if you want more control over cancellation, you can supply your own. Consider what happens when you request three pages but request cancellation with the default token.

[| Uri "http://nostarch.com"

Uri "http://microsoft.com"

Uri "http://fsharp.org" |]

|> Array.iter (fun u -> Async.Start(displayPartialPage u))

Async.CancelDefaultToken()

Executing the preceding code usually results in all three workflows being cancelled. (Usually, but not always, because there’s a chance that one or more workflows complete before the cancellation is handled.) To isolate each workflow’s cancellation, we can use an overload of the Startmethod that accepts a user-specified token, like this:

open System.Threading

let tokens =

[| Uri "http://nostarch.com"

Uri "http://didacticcode.com"

Uri "http://fsharp.org" |]

|> Array.map (fun u -> ① let ts = new CancellationTokenSource()

Async.Start(displayPartialPage u, ② ts.Token)

ts)

③ tokens.[0].Cancel()

④ tokens.[1].Cancel()

In this revised version, we use Array.map to map each Uri to a workflow with its own CancellationTokenSource created at ①. We then pass the associated token to Async.Start as the second argument ② before returning the CancellationTokenSource. Finally, at ③ and ④, respectively, we request cancellation of the first and second requests, allowing the third to proceed as normal.

What’s especially nice about cancelling asynchronous workflows is that, unlike the TPL, cancellation tokens are propagated through the entire workflow automatically. This means that you don’t have to manually ensure that each new workflow is given a token, leaving you with cleaner code.

Exception Handling

Because exceptions can and do occur within asynchronous workflows, it’s important to know how to handle them properly. There are a few exception-handling options available, but their utility may vary depending on what you’re doing.

The most uniform way to handle exceptions in an asynchronous work-flow is to wrap the potentially offending code inside a try...with block within the async expression. For instance, we can provide a version of our getPage function that handles exceptions raised during the page request and read, like this:

let getPageSafe uri =

async {

try

let! content = getPage uri

return Some content

with

| :? NotSupportedException as ex ->

Console.WriteLine "Caught NotSupportedException"

return None

| :? OutOfMemoryException as ex ->

Console.WriteLine "Caught OutOfMemoryException"

return None

| ex ->

ex |> sprintf "Caught general exception: %O" |> Console.WriteLine

return None }

There’s nothing unusual about the try...with block in the preceding code—we simply wrap the asynchronous call to getPage in the try...with block and return a successful read as an option. Should the operation raise an exception, we match the exception type, print a message, and return None.

Another way to handle exceptions from asynchronous workflows is the Async.Catch method. Async.Catch takes a more functional approach than StartWithContinuations in that rather than accepting an exception-handling function, it returns Choice<'T, exn>, where 'T is the asynchronous workflow’s return type and exn is the exception thrown by the workflow.

The Choice type is a discriminated union with two union cases: Choice1Of2 and Choice2Of2. For Async.Catch, Choice1Of2 represents successful completion of the workflow and contains the result, whereas Choice2Of2 represents failure and contains the first raised exception.

Handling exceptions with Async.Catch lets you structure your asynchronous code to create an idiomatic, pipelined data flow. For example, the following code shows how we can model an asynchronous operation as a series of function applications, beginning with a Uri.

Uri "http://nostarch.com"

|> getPage

|> Async.Catch

|> Async.RunSynchronously

|> function

| Choice1Of2 result -> Some result

| Choice2Of2 ex ->

match ex with

| :? NotSupportedException ->

Console.WriteLine "Caught NotSupportedException"

| :? OutOfMemoryException ->

Console.WriteLine "Caught OutOfMemoryException"

| ex ->

ex.Message |> sprintf "Exception: %s" |> Console.WriteLine

None

Here, a Uri is piped into the getPage function to create an asynchronous workflow. The resulting workflow is piped into Async.Catch to set up another workflow, which we then pipe to Async.RunSynchronously so we can wait for the result. Finally, we pipe the Choice into a pattern-matching function where we either return Some result or handle the exception before returning None.

Asynchronous Workflows and the Task Parallel Library

In addition to the ThreadPool-based asynchronous operations we’ve seen so far, the Async class provides a few methods for working with TPL tasks. Most notable among them are StartAsTask and AwaitTask.

The StartAsTask method invokes an asynchronous workflow as a TPL task. You would typically use this for CPU-bound operations or to expose an asynchronous workflow to code using the TPL in C# or Visual Basic. For instance, we can treat the result of our getPage function as a TPL task like this:

Uri "http://nostarch.com"

|> getPage

|> Async.StartAsTask

|> (fun t -> ① t.Result.Substring(0, 50))

|> printfn "%s"

The presence of the Result property at ① indicates that the result of StartAsTask is indeed a Task. In a more real-world scenario, you likely wouldn’t fire off a task and immediately block by waiting for the result, but this example is intended only to show how to start an asynchronous workflow as a TPL Task.

The StartAsTask method is handy when you need to create a new task, but what about when you need to handle an existing task? Consider the DownloadStringTaskAsync method added to the System.Net.WebClient class in .NET 4.5. This method serves the same purpose as ourgetPage function except that it encapsulates downloading a resource within a TPL task.

In C#, you can easily handle such methods with the async modifier and await operator, as shown here:

// C#

// using System.Threading.Tasks

private static ① async Task<string> GetPageAsync(string uri)

{

using (var client = new System.Net.WebClient())

{

return ② await client.DownloadStringTaskAsync(uri);

}

}

static void Main()

{

var result = GetPageAsync("http://nostarch.com").Result;

Console.WriteLine("{0}", result.Substring(0, 50));

Console.ReadLine();

}

From a greatly simplified perspective, what happens in the preceding C# code is this: The async modifier ① is applied to the GetPageAsync method to signify that part of the method will run asynchronously. The await operator ② then signifies that execution should return to the caller and the remainder of the method should be treated as a continuation to be executed when the task completes.

Asynchronous workflows allow us to follow a similar pattern in F# using the AwaitTask method in combination with a TPL task and let!, use!, or return!. Here is the corresponding code in F#:

// F#

open System.Threading.Tasks

let getPageAsync (uri : string) =

async {

use client = new System.Net.WebClient()

① return! Async.AwaitTask (client.DownloadStringTaskAsync uri)

}

async {

② let! result = getPageAsync "http://nostarch.com"

result.Substring(0, 50) |> printfn "%s"

} |> Async.Start

Although they’re not quite functionally equivalent (the C# version waits for the result in Main while the F# version passes the result to a continuation), the F# approach is similar to that of C#. In the F# version, the asynchronous workflow created by the getPageAsync function usesreturn! and Async.AwaitTask ① to wait for the task to complete before returning the result. Then, in the second asynchronous workflow, let! ② is used to evaluate getPageAsync, while printing the result is treated as a continuation.

Agent-Based Programming

As if the TPL and asynchronous workflows didn’t make parallel and asynchronous programming accessible enough, F# has borrowed a message-processing mechanism from Erlang. The MailboxProcessor<'T> class implements a queue-based system for asynchronously routing messages (data items) to handlers using shared memory. This is especially useful in scenarios where multiple sources (clients) need to request something from a single target (server), the canonical example being a web server. Furthermore, because MailboxProcessor instances are extremely lightweight, an application can manage thousands of them without breaking a sweat. This fact enables mailbox processors to work independently or together by passing messages between instances.

MailboxProcessor instances are usually referred to as agents, and I’ll follow this convention throughout this section. In that regard, a common practice in agent-based programming is to alias MailboxProcessor<'T> as Agent<'T>as follows:

type Agent<'T> = MailboxProcessor<'T>

With the type aliased, we can create agents using the more convenient name.

Getting Started

I think the best way to understand agent-based programming is with an example. We’ll start with a simple agent that simply prints whatever is sent into it.

type Message = | Message of obj

let echoAgent =

① Agent<Message>.Start(

fun inbox ->

② let rec loop () =

async {

let! (Message(content)) = ③ inbox.Receive()

printfn "%O" content

④ return! loop()}

⑤ loop())

In the preceding code, we create an agent called echoAgent by passing a function to the Start method as shown at ①. By convention, the function’s parameter is called inbox because it’s the mailbox from which we’ll receive new messages. At ② we define the recursive loop function, which we’ll call continually to receive new messages.

NOTE

It’s certainly possible to loop imperatively using a while loop, but the recursive function is the more typical approach. Functional loops provide the additional benefit of easily allowing you to provide different looping logic when you need to manage multiple states. For instance, if your agent needs to behave differently in a paused state than a running state, you could define a pair of mutually recursive functions that both return a workflow that handles the corresponding state accordingly.

Inside the loop, we create an asynchronous workflow that first asynchronously receives a message from inbox using the Receive method as shown at ③.Next, the received message is printed before making an asynchronous recursive call to loop at ④. Finally, at ⑤ we initiate recursion by making a standard, synchronous call to loop.

With echoAgent actively listening, we can send it some messages via the Post method, like this:

> Message "nuqneH" |> echoAgent.Post;;

nuqneH

> Message 123 |> echoAgent.Post;;

123

> Message [ 1; 2; 3 ] |> echoAgent.Post;;

[1; 2; 3]

As you can see, when echoAgent receives a message, it is written to the console and then echoAgent waits for another message, and the process repeats ad infinitum.

Scanning for Messages

In the echoAgent example, we used the Receive method to get messages from the underlying queue. In many cases, Receive is appropriate, but it makes it difficult to filter messages because it removes them from the queue. To selectively process messages, you might consider using theScan method instead.

Scanning for messages follows a different pattern than receiving them directly. Rather than processing the messages inline and always returning an asynchronous workflow, the Scan method accepts a filtering function that accepts a message and returns an Async<'T> option. In other words, when the message is something you want to process, you return Some<Async<'T>; otherwise, you return None. To demonstrate, let’s revise the echoAgent to process only strings and integers.

let echoAgent2 =

Agent<Message>.Start(fun inbox ->

let rec loop () =

inbox.Scan(fun (Message(x)) ->

match x with

| ① :? string

| ② :? int ->

Some (async { printfn "%O" x

return! loop() })

| _ -> printfn "<not handled>"; None)

loop())

At ① and ② you can see standard dynamic type-test patterns used to filter incoming messages to strings and integers, respectively. When the message is one of those two types, we associate an asynchronous workflow with Some and return it. For all other messages, we return None. Scan the nexamines the returned value, and when it is Some, the message is consumed(removed from the queue) and the workflow is invoked. When the returned value is None, Scan immediately waits for another message.

Passing messages to echoAgent2 is the same as before—just pass the messages via the Post method:

> Message "nuqneH" |> echoAgent2.Post;;

nuqneH

> Message 123 |> echoAgent2.Post;;

123

> Message [ 1; 2; 3 ] |> echoAgent2.Post;;

<not handled>

Scanning for messages does offer some flexibility with how you process messages, but you need to be mindful of what you’re posting to the agent because messages not processed by Scan remain in the queue. As the queue size increases, scans will take longer to complete, so you can quickly run into performance issues using this approach if you’re not careful. You can see how many messages are in the queue at any time by inspecting the CurrentQueueLength property. If you need to remove messages from the queue, you can do so by invoking Receive for each message in the queue, but needing to do so is probably indicative of a larger design problem that should be addressed.

Replying to Messages

The agents we’ve created so far have been self-contained: They receive a message, do something with it, and wait for another message. Agents don’t have to work in isolation, though. One way you can make agents more interactive is by having them reply via an AsyncReplyChannel. To demonstrate, let’s revise echoAgent again, but this time, instead of printing a message within the agent, we’ll have it reply.

① type ReplyMessage = | ReplyMessage of obj * AsyncReplyChannel<obj>

let echoAgent3 =

Agent.Start(fun inbox ->

let rec loop () =

async {

let! ② (ReplyMessage(m, c)) = inbox.Receive()

③ c.Reply m

return! loop()

}

loop())

The overall structure of echoAgent3 doesn’t differ much from the previous versions. For convenience, we’re using a discriminated union ① for our message type as is typical in agent-based programming. In this case, the ReplyMessage union type has a single case with two associated values, an object and the reply channel.

Inside the loop body, we use pattern matching ② to identify the union case and extract the message and channel. We then pass the message to the channel’s Reply method ③ before repeating. Now all that’s left is to send a message to the agent.

ReplyMessage’s second value is an AsyncReplyChannel<obj>, as you’ve already seen. In theory we could manually construct a reply channel and send the ReplyMessage to the agent with the Post method, but then we’d have to handle waiting for the result manually. There are much better ways to get the reply channel—namely, the PostAndReply method and its variants.

The PostAndReply methods differ a bit from Post in that, instead of accepting the message directly, they are higher-order functions that accept a function that takes in a preconstructed reply channel and returns the fully constructed method. For our purposes, we’ll simply create aReplyMessage like this:

echoAgent3.PostAndReply(fun c -> ReplyMessage("hello", c))

|> printfn "Response: %O"

Internally, PostAndReply (and its variants) construct reply channels that they pass on to the supplied function, which then creates the message that is ultimately posted to the agent.

Example: Agent-Based Calculator

Now that you’ve seen a variety of ways to create and interact with agents, let’s look at a more interesting example that ties together several of the concepts for something a bit more useful than simply regurgitating its input: an agent-based calculator. We’ll begin by defining a discriminated union that represents the messages the calculator will support.

type Operation =

| Add of float

| Subtract of float

| Multiply of float

| Divide of float

| Clear

| Current of AsyncReplyChannel<float>

The Operation union type defines six cases. Of those, four represent basic mathematical operations and have an associated float that is used in the calculation. The Clear case allows us to clear the stored value. Finally, the Current case lets us interrogate the agent for its current value using its associated reply channel. From this definition, we can create a new agent that handles each case as follows:

let calcAgent =

Agent.Start(fun inbox ->

let rec loop total =

async {

let! msg = inbox.Receive()

let newValue =

match msg with

| Add x -> total + x

| Subtract x -> total - x

| Multiply x -> total * x

| Divide x -> total / x

| Clear -> 0.0

| Current channel ->

channel.Reply total

total

return! loop newValue }

loop 0.0)

Even though calcAgent appears to keep a running total, it is a bit of an illusion in that we keep state only by passing a value (total) to the recursive loop function. When calcAgent receives a message, it uses pattern matching to determine the appropriate action, binding the result tonewValue. For instance, when it receives an Add, Subtract, Multiply, or Divide operation, it applies the corresponding mathematical operation to total. Similarly, when it receives a Clear operation, it simply returns 0.0 and Current returns total after replying.

To see calcAgent in action, we just need to send it some messages:

[ Add 10.0

Subtract 5.0

Multiply 10.0

Divide 2.0 ]

|> List.iter (calcAgent.Post)

calcAgent.PostAndReply(Current) |> printfn "Result: %f"

calcAgent.Post(Clear)

calcAgent.PostAndReply(Current) |> printfn "Result: %f"

In the preceding snippet, we simply pass a list of Operations to List.iter, posting each message to calcAgent. When those have been processed, we query for the current value, clear, and then query again to ensure that the total has been zeroed out. Invoking the preceding snippet results in the following:

Result: 25.000000

Result: 0.000000

Summary

Asynchronous and parallel programming have long been viewed as tools for specialized software and reserved for experienced developers. With processor manufacturers improving processor performance by adding cores instead of increasing clock speed, software developers can no longer solve performance issues solely by upgrading hardware, nor can they continue expecting users to wait for long-running operations to complete before returning control.

Languages such as F# make asynchronous and parallel programming more accessible by providing multiple, robust mechanisms. The TPL makes it easy for developers to efficiently handle CPU-bound operations such as processing large data sets while effectively using available system resources. Language features such as asynchronous workflows excel at keeping applications responsive during IO-based operations such as web requests or file accesses. Finally, agent-based programming lets you easily coordinate complex systems by firing off individual asynchronous processes without having to directly manage the complexity of traditional thread-based models. Together, these approaches help you build scalable, responsive applications that meet the demands of modern computing while keeping you focused on the actual problems your software is trying to solve.