Asynchronous and agent-based programming - Developing complete systems - F# Deep Dives (2015)

F# Deep Dives (2015)

Part 3. Developing complete systems

Chapter 8. Asynchronous and agent-based programming

Colin Bull

The real world consists of independent entities interacting through asynchronous exchanges of information. Programming languages that provide abstractions for asynchronous computations and agents are ideally suited for modeling real-world problems. This chapter introduces asynchronous workflows and agent-based concurrency in F#.

All about asynchronous workflows

Programming in the functional style promotes a compositional approach to problem solving. The solution to a complex problem can be composed from the solution to simpler subproblems. Pure functions and immutable data structures are the fundamental building blocks used to implement solutions using this approach. Code written in this style is clear, concise, and easy to reason about.

Unfortunately, for any program to be useful, it also has to interact via side effects with the real world. Interactions with the real world involve coming to grips with functional programming’s so-called “awkward squad,” including I/O and concurrency. A challenge for any functional-first programming language is how to express computations that interact with the real world without compromising the benefits of the compositional approach to problem solving.

The F# programming language provides abstractions for elegantly expressing asynchronous computations that can be used for I/O and concurrency without losing the benefits of composition. This chapter will show how you can use the asynchronous workflows and agents provided by F# to solve the real-world problems you’ll encounter in your work as an enterprise software developer without losing any of the clarity, conciseness, or correctness of the code that you’re used to when programming in a functional style.

What are asynchronous workflows?

Using asynchronous operations in your code can lead to increased performance, especially when dealing with databases, web services, or I/O. But writing asynchronous code often involves significant amounts of not entirely trivial boilerplate, and if it’s not done properly it can lead to degraded performance and bugs that can be subtle. Asynchronous workflows aim to simplify the asynchronous programming model by removing a lot of the boilerplate:

This code creates an asynchronous computation that creates a web request and asynchronously gets the response . The code then opens a stream and passes it to a handler . The type of the transformer parameter is Stream -> Async<'a>, meaning it should have one parameter that takes a Stream and it must return an Async<'a>.

This code asynchronously reads the entire incoming stream as a string:

> let readStreamAsString (stream:Stream) =

async {

use streamReader = new StreamReader(stream)

return! streamReader.ReadToEndAsync() |> Async.AwaitTask

}

val readStreamAsString : stream:Stream -> Async<string>

You can now combine these two functions to download a web page, as shown in the following listing.

Listing 1. Composing asynchronous workflows (F# Interactive)

> let result =

doWebRequest "http://www.google.com" "GET" readStreamAsString

|> Async.RunSynchronously

val result : string =

"<!doctype html><html itemscope="" itemtype="http://schema.org"+[43426 chars]

This is a nice example of how asynchronous workflows can be composed to build more complex behaviors.

Meet the agents

The goal of the agent/actor programming model is to simplify writing concurrent applications. The programming model was first introduced by Hewitt, Bishop, and Steiger’s paper in 1973.[1] In the actor model, everything is an actor, in the same way everything is an object in object-oriented programming (OOP). In the formal definition, actors can do the following:

1 Carl Hewitt, Peter Bishop, and Richard Steiger, “A Universal Modular ACTOR Formalism for Artificial Intelligence,” Proceedings of the 3rd International Joint Conference on Artificial intelligence, 1973.

· Send messages to each other

· Change the actor’s message-handling behavior for the next message

· Create new actors

· Process messages in any order

In F#, the MailboxProcessor<'a> type represents the fundamental element of this computational model. MailboxProcessor<'a> is backed by a queue. Messages can be posted to the queue asynchronously and then processed in the actor loop, as shown in the next listing.[2]

2 For more information about F# agents and the MailboxProcessor type, see “Server-Side Programming with F# Agents,” MSDN, http://mng.bz/44zg; or Tomas Petricek, “An Introduction to F# Agents,” Developer Fusion, http://mng.bz/59L2.

Listing 2. Creating an agent (F# Interactive)

This code creates an agent that on receipt of a message downloads a string from the specified URL, stores it in a cache, and returns the result .

You can call this agent and get a result by posting a message of type Request to the agent:

> let agentResult=

downloadAgent.PostAndReply(fun reply -> Get("http://www.google.com", reply))

val agentResult : string =

"<!doctype html><html itemscope="" itemtype="http://schema.org"+[43426 chars]

Repeating this call to the same URL won’t result in another network call being made; instead, the value will be served directly from the cache.

The previous example directly uses the MailboxProcessor<'a> defined as part of the F# core library. Although it’s a good, robust implementation of an actor, using the MailboxProcessor<'a> directly in a large actor system can cause some problems.

One such problem comes from directly referencing instances of actors. This approach quickly becomes unmanageable and hard to reason about, much like directly constructing types in a large object-oriented system. Typically, in a large object-oriented solution, you’d use some sort of container to manage the references for you, and then ask the container for the types when you need them. The same is true in a large actor system; in fact, you can broadly think of actors as being equivalent to classes. Another problem worth mentioning is the fact thatMailboxProcessor<'a> is built on an unbounded queue; this can cause memory issues when a system gets busy. Queues can quickly build when processing individual messages takes a long time. Toward this end, you may not want to use MailboxProcessor<'a> as your agent implementation. To solve this, the following listing introduces some abstractions over an agent.

Listing 3. Abstracting an agent

Listing 3 creates a simple abstract class, AgentRef<'a>, that encapsulates the basics of an agent. It also defines an interface, IAsyncReplyChannel<'a>, and MailboxReplyChannel<'a>. This supports the PostAndTryAsyncReply functionality, which is necessary because theAsyncReplyChannel<'a> type defined in Microsoft.FSharp.Control, used to support the post and reply functionality, is currently sealed. Providing this interface (see the next listing) lets you create different channels that agents can reply on.

Listing 4. Defining an agent

type Agent<'a>(id:string, comp, ?token) =

inherit AgentRef<'a>(id)

let mutable agent = None

override x.Receive() = agent.Value.Receive()

override x.Post(msg:'a) = agent.Value.Post(msg)

override x.PostAndTryAsyncReply(builder) =

agent.Value.PostAndTryAsyncReply(fun rc ->

builder(new MailboxReplyChannel<_>(rc)))

override x.Start() =

let mbox = MailboxProcessor.Start((fun inbox ->

comp (x :> AgentRef<_>)), ?cancellationToken = token)

agent <- Some mbox

This code creates a type, Agent<'a>. This type wraps and exposes some of the functionality provided by MailboxProcessor<'a>. The type delegates calls to the underlying agent, which is created when the agent is started. For PostAndTryAsyncReply, you can’t delegate the call directly to the agent. This is because the type definition for PostAndTryAsyncReply requires the builder function to accept an IAsyncReply-Channel <'a>. The AsyncReplyChannel<'a> instance passed by MailboxProcessor doesn’t implement this interface; therefore you use the MailboxReplyChannel<'a> type to wrap this instance in a type that does support that interface.

Addressing agents

Using the new abstraction, you can now create a local agent.

Listing 5. Creating an agent (F# Interactive)

> open FSharpDeepDives

> let console =

let agent =

Agent("console-writer", (fun (agent:AgentRef<string>) ->

let rec loop() =

async {

let! msg = agent.Receive()

printfn "%s" msg

return! loop()

}

loop()

))

agent.Start()

agent

val console : FSharpDeepDives.DomainTypes.Agent<string>

You can then post to the agent by directly referencing the agent and calling the Post method with the message:

> console.Post("Writing through an agent")

As mentioned previously, in smaller systems, directly referencing the agents can be acceptable. But as a system grows, it becomes necessary to provide an abstraction that allows you to indirectly reference an agent. To do so, you can introduce an addressing scheme for agents.

Listing 6. A simple agent registry

module Registry =

let mutable private agents = Map.empty<string, AgentRef list>

let register (ref:AgentRef<'a>) =

match Map.tryFind ref.Id agents with

| Some(refs) ->

agents <- Map.add ref.Id ((ref :> AgentRef) :: refs) agents

| None ->

agents <- Map.add ref.Id [ref :> AgentRef] agents

ref

let resolve id =

Map.find id agents

This is a minimal implementation of an agent registry. The registry is a Map<string, AgentRef list> that’s used to store and retrieve agents. The Registry module allows many agents to be assigned to a single ID; this provides primitive broadcast semantics. To use the Registry, be sure that whenever an agent is created, it’s registered, as shown next.

Listing 7. Registering an agent

> open FSharpDeepDives

> let console =

let agent =

Agent("console-writer", (fun (agent:AgentRef<string>) ->

let rec loop() =

async {

let! msg = agent.Receive()

printfn "%s" msg

return! loop()

}

loop()

))

agent |> Registry.register

agent.Start()

val console : FSharpDeepDives.DomainTypes.Agent<string>

Once an agent is registered, you can resolve to the agent by name.

Listing 8. Resolving and posting to an agent (F# Interactive)

> Registry.resolve "console-writer"

|> List.iter (fun a -> (a :?>AgentRef<string>).Post("Hello"))

Although resolving agents in this manner has the additional cost of the lookup (which will impact performance if done in tight loops), the flexibility it provides outweighs this.

Tidying up the implementation

Currently, listings 7 and 8 are functional but ugly. It would be nice if you could tidy up the implementation and make things like registration and posting more fluent. Fortunately, you can create a module that contains some helper functions for tidying up the interface with your agents, as you can see in the next listing.

Listing 9. Providing some helper functions

module Agent =

let start (ref:AgentRef) =

ref.Start()

ref

let spawn ref =

Registry.register ref

|> start

let post (refs:#seq<AgentRef>) (msg:'a) =

refs |> Seq.iter (fun r -> (r :?> AgentRef<'a>).Post(msg))

let postAndTryAsyncReply (refs:#seq<AgentRef>) msg =

refs

|> Seq.map (fun r -> (r :?>

AgentRef<'a>).PostAndTryAsyncReply(msg))

|> Async.Parallel

let postAndAsyncReply (refs:#seq<AgentRef>) msg =

async {

let! responses = postAndTryAsyncReply refs msg

return responses |> Seq.choose id

}

let postAndReply (refs:#seq<AgentRef>) msg =

postAndAsyncReply refs msg |> Async.RunSynchronously

let resolve id = Registry.resolve id

In addition to hiding some implementation details (how agents are registered), providing a module like this allows you to build a mini domain-specific language (DSL) for your agent type, as shown in listing 10. Doing so allows you to use a more fluent API when speaking the language of agents.

Listing 10. Creating an agent with a mini-DSL (F# Interactive)

> open FSharpDeepDives

> open FSharpDeepDives.Agent

> let console =

Agent("console-writer", fun (agent:AgentRef<string>) ->

let rec loop() =

async {

let! msg = agent.Receive()

printfn "%s" msg

return! loop()

}

loop()

)

|> spawn

val console : FSharpDeepDives.DomainTypes.Local<string>

Listing 10 creates an agent identical to that in listing 7, but the DSL removes some of the boilerplate and automatically registers each agent so it can be resolved by name. But this hasn’t reduced the code much when compared to listing 7. The real benefit is realized when you consider resolving and posting to an agent. Listing 8 now becomes

> resolve "console-writer" |> post <| "Hello, world"

val it : unit = ()

This code feels far more natural, but confusion might arise from the use of the affectionately termed tie-fighter syntax. Only a small set of terms can be treated as infix operators in F#. This syntax overcomes that by exploiting the curried nature of the post function. It’s exactly equivalent to the following:

> post (resolve "console-writer") "Hello, world"

It’s a question of personal preference as to which style you use. Usually we just pick the form that reads the best for the given situation.

Request-response from agents

So far in this section we’ve only shown how to post to agents. Posting to agents is a one-way activity; it’s somewhat analogous to calling a method with void as a return type in C#. But in C# you can define methods that return any type you want. So how do you do this with agents? Well, when you defined an interface for an Agent<'a> in addition to the post method, you defined a method called PostAndTryAsyncReply. When you post a message to an agent, it’s asynchronously placed onto a queue to wait to be processed. The asynchronous nature of posting a message to the agents now causes a problem because you have to wait an unknown amount of time for the agent to get to your message and reply. To return the reply to the original caller, you need to create a channel that the agent can post to with a response. Your calling code can then asynchronously wait on that channel for a response. F# provides AsyncReplyChannel<'a>, which offers the exact behavior required. Unfortunately, this type is sealed and has an internal constructor that makes it difficult to reuse. But this has already been abstracted in listing 8 with theIAsyncReplyChannel<'a> interface and the MailboxReplyChannel<'a> type. You’ll see further uses of this abstraction later in this chapter.

When we first introduced agents, we showed you an example of downloading and caching the response from a URL. This example used PostAndReply, which does exactly as described earlier. But rather than asynchronously waiting for the response, PostAndReply blocks until a response is received. To achieve the same thing with the new DSL, the internal loop code is almost identical; the only change is the function you use to post to the agent, as you can see in the following listing.

Listing 11. URL cache with the agent DSL

let urlReader =

Agent("url-reader", fun (agent:AgentRef<Request>) ->

let rec loop (cache : Map<string, string>) = async {

let! msg = agent.Receive()

match msg with

| Get(url, reply) ->

match cache.TryFind(url) with

| Some(result) ->

reply.Reply(result)

return! loop cache

| None ->

let! downloaded =

doWebRequest url "GET" readStreamAsString

reply.Reply(downloaded)

return! loop (Map.add url downloaded cache)

}

loop Map.empty)

|> spawn

let agentResult =

resolve "url-reader" |> postAndReply <|

(fun rc -> Get("http://www.google.com", rc))

|> Seq.iter (printfn "%A")

val agentResult : string =

"<!doctype html><html itemscope="" itemtype="http://schema.org"+[43426 chars]

Exercise 1

In listing 11, you add an entry to the cache every time a new URL is passed to the agent. But the contents of URLs changes, and it’s memory consuming to keep every URL requested available forever. Your task is to implement expiry for the agent.

So far you’ve explored agents and the abstractions you need so that they can be used in a large-scale system. You created a DSL that uses these abstractions and allows a consumer to resolve and post to an agent’s name. Having these abstractions will go a long way in helping you to build a large system based on agents.

Extracting data from the world

The world is full of data in many different forms. In enterprises, almost all systems extract data from a source, transform it into a domain model, and then either perform analysis on the data or save the results for later reporting. Often, downstream systems pick up this data, and the cycle continues. This process is trivial when you control both the source of the data and the sink; but frequently this isn’t the case. Data often comes from a source external to the company.

Data comes in many shapes and formats. CSV, TSV, and XML are some common formats, and JSON is becoming increasingly popular. This list is not exhaustive, and other formats can be somewhat more interesting to extract data from—PDFs or images, for example. How can you exploit your knowledge of agents and asynchronous workflows in F# to get this data into your system? We can break down almost all data-collection problems into three steps: extract, transform, and load (ETL). In the next section, you’ll see how to use agents to build an ETL pipeline.

The extract, transform, and load (ETL) model

The ETL model can be easily defined in F#, as the following listing shows.

Listing 12. Simple ETL function

let etl extractf transformf loadf =

extractf()

|> transformf

|> loadf

This is about as simple as you can get—and it works. You can, for example, write a simple set of functions to parse a CSV file into a string[][] type and then write the contents out as JSON to another file.

Listing 13. Using the ETL function

let readFile path =

(fun () -> File.ReadAllText(path))

let writeJson path input =

(path, JsonConvert.SerializeObject(input))

|> File.WriteAllText

let split (char:string) (input:string) =

input.Split([|char|], StringSplitOptions.RemoveEmptyEntries)

let parseCsv (input:string) =

split Environment.NewLine input |> Array.map (split ",")

let result =

etl

(readFile "c:\data.csv")

parseCsv

(writeJson "c:\data.json")

As functional as this is, you’re only dealing with the simplest case. For example, in this scenario you’d have to wrap the entire function in a try..with block. But this isn’t ideal, because it gives you little information about which stage failed. In listing 13, an IOException could occur at either the extract (readFile) or load (writeJson) stages, and you’d need a debugger to get to the bottom of this exception. In the real world, you often have little control over the source system; so, as shown in listing 14, your system should always be designed to be robust and catch errors in a way that helps the support team diagnose what has gone wrong.

Listing 14. Protected running of each stage

let etl extractf transformf loadf =

let stage name success input =

match input with

| Choice1Of2(output) ->

try Choice1Of2 <| success output with e -> Choice2Of2 (name,e)

| Choice2Of2(err) -> Choice2Of2 err

stage "extract" extractf (Choice1Of2 ())

|> stage "transform" transformf

|> stage "load" loadf

Here you’re using the Choice<'a, 'b> type to protect the execution of each stage. If you get an error during the transform stage, the last stage, load, will forward this error on. When an error occurs, the error is wrapped in a Choice and has the name of the stage attached to it to aid diagnostics. At this point you could add any amount of metadata to the error. If you try to run this when the source file doesn’t exist, you’ll get a result like the following:

val result : Choice<unit,(string * exn)> =

Choice2Of2

("extract",System.IO.FileNotFoundException: Could not find file

'data_!.csv'...)

You can now capture errors that may occur at each stage and enrich them as you see fit.

Making things asynchronous

In general, the extract and load stages of an ETL system involve some sort of I/O, whether it’s reading from a file as in the previous example or writing the transformed results to a database. You should make these stages asynchronous to prevent your ETL solution from blocking the entire process while it executes. This means you can have more than one ETL pipeline executing at any one time, allowing you to parallelize this work. To make your ETL function asynchronous, you’ll use F#’s existing Async<'a> computation expression. To make use of it, you need to change your definition of the stage function.

Listing 15. Asynchronous stage function

The stage function in listing 15 is fairly similar to the function implemented earlier. The major difference is that you now have a somewhat nasty representation for tying together the stages. F# comes with a built-in solution to this problem: as shown in listing 16, you can create a computation expression builder that does this sugaring for you and leaves you with a more imperative syntax to work with.

Listing 16. ETL computation expression

module Etl =

let stage name f rest =

async {

let! f = f |> Async.Catch

match f with

| Choice1Of2 r ->

let! result = rest r

return result

| Choice2Of2 e -> return Choice2Of2 (name,e)

}

let ret f = async { return Choice1Of2 f }

let fail f = async { return Choice2Of2 f }

type EtlBuilder() =

member x.Bind((name,f), rest) = stage name f rest

member x.Return(f) = ret f

member x.ReturnFrom(f) : Async<Choice<_,_>> = f

let toAsync successF compensation comp =

async {

let! result = comp

match result with

| Choice1Of2 r -> return successF r

| Choice2Of2 (name,err) -> return compensation name err

}

let etl = Etl.EtlBuilder()

Listing 17 defines a computation builder that allows you to compose various ETL functions without having to worry about the awkward wiring introduced when you made your stages asynchronous.

Listing 17. Using the ETL computation builder

let etl extractf transformf loadf =

etl {

let! extracted = "extract",extractf

let! transformed = "transform",transformf extracted

let! result = "load",loadf transformed

return result

}

This is far simpler and much cleaner than the previous implementation, because all the nasty boilerplate has been removed. As an added bonus, because you can arbitrarily compose functions, you no longer need the enclosing function; listing 18 shows an asynchronous version of listing 17.

Listing 18. Asynchronous file converter

let readFileAsync path =

async {

use fs = File.OpenRead(path)

use sr = new StreamReader(fs)

return! sr.ReadToEndAsync() |> Async.AwaitTask

}

let parseCsvAsync (input:string) =

async {

return split Environment.NewLine input |> Array.map (split ",")

}

let writeJsonAsync path input =

async {

let! serialized =

JsonConvert.SerializeObjectAsync(input)

|> Async.AwaitTask

use fs = File.Open(path, FileMode.Create,

FileAccess.Write, FileShare.None)

use sw = new StreamWriter(fs)

return! sw.WriteAsync(serialized) |> Async.AwaitIAsyncResult

}

let parseCSVAndSaveAsJson sourcePath sinkPath =

etl {

let! extracted = "extractCsv", (readFileAsync sourcePath)

let! parsedCsv = "parseCSV", (parseCsvAsync extracted)

do! ("savingJson",

(writeJsonAsync sinkPath parsedCsv |> Async.Ignore))

}

|> Etl.toAsync id (printfn "Error Stage: %s - %A")

parseCSVAndSaveAsJson "data.csv" "data.json" |> Async.RunSynchronously

Now that you can create ETL computation expressions that can compose asynchronous functions together into a workflow, you need a way to run or schedule the run of these computations. In the next section, we look at how to use cron expressions and system events to run these computations.

Exercise 2

In listing 18, you naively extract the entire file upon reading the file. Now suppose the file was gigabytes or terabytes in size: what would have to change in listing 18 to make this work for files that are too large to fit in memory? Your task is to refactor the functions to handle large files.

Scheduling ETL work

In the section “The extract, transform, and load (ETL) model,” you created an ETL workflow. With this workflow, you can create delayed asynchronous expressions that can represent any given ETL task. You now need a way to run these tasks. There are two types of data sources: pull and push. Examples of pull data sources include reading from a file, running a query against a database, and extracting a page from a website. Push data sources include a subscription to a message bus topic, responding to data received on a network channel, and even UI events.

Earlier we talked about building on event streams and the advantages of that approach, so let’s try to capitalize on that in our ETL work. The only problem is that push data sources are rare in the real world. This begs the question, how can you turn a pull data source into a push data source? The answer is simple: you schedule a file extraction to be run at given intervals, and then, as each interval passes, you raise an event to trigger some work. Okay, so it isn’t a push data source in the true sense, but it’s close enough for you to be able to exploit some of the nice properties of an event-driven architecture.

Version 4 of the .NET Framework introduced the IObservable<'a> and IObserver<'a> interfaces. These interfaces provide the basis for push-based notifications based on the Observer pattern. IObservable<'a> provides a single method, Subscribe, which allows an observer to observe the data source. Upon subscribing, the Subscribe method returns an instance of IDisposable, which can be used later to stop that instance of the observer from receiving notifications. The IObserver<'a> interface provides the means to send notifications. This interface can raise three types of notifications:

· Data —Notifies the observer of the next event (IObserver<'a>.OnNext)

· Error —Notifies the observer that an error occurred (IObserver<'a>.OnError)

· Completed —Notifies the observer that the stream has completed (IObserver<'a> .OnCompleted)

Because the framework already provides these interfaces, it makes sense to use them, but you still need a way to indicate when you want to raise these events. You could use a TimeSpan object, but doing so isn’t flexible because you can’t exclude certain days, dates, months, or years. Another option is to use a precomputed list of date-time structures, but this approach isn’t scalable or robust. A better option is to use cron expressions, as you’ll see next.

Cron expressions

Cron is a scheduling utility built into *nix-based operating systems. A cron expression is the specification that defines when something should happen. Cron expressions are a powerful way of representing schedules. You can use a cron expression to define almost any schedule, such as

0 0 1 14,18 JAN,MAR,SEP ? 2014

which yields the following dates:

· Tuesday, January 14, 2014 1:00 AM

· Saturday, January 18, 2014 1:00 AM

· Friday, March 14, 2014 1:00 AM

· Tuesday, March 18, 2014 1:00 AM

· Sunday, September 14, 2014 1:00 AM

· Thursday, September 18, 2014 1:00 AM

Parsing cron expressions is out of the scope of this chapter. But assuming you can parse a cron expression to a seq<DateTime>, you can then asynchronously evaluate this sequence and raise events on an observer.

Listing 19. Turning a sequence of date-times into an observable

Now you can schedule your convert CSV task from the previous section, as shown in the following listing.

Listing 20. Scheduling a task

So far you’ve seen how to schedule a task using cron expressions and how to chain together different stages of an ETL task in a composable way. In the trivial examples you’ve seen so far, directly chaining together the stages has been acceptable, because they all take approximately the same amount of time to complete. But if you’re in a situation where a stage may take a long time to complete, then you have a problem, because you’ll end up getting a backlog of work that could keep growing until the process runs out of memory. Pipelining stages will help solve this problem, and in the next section you’ll see how pipelines work.

Implementing ETL pipeline workers

Pipelines are useful if you want to process a large number of inputs in parallel and that processing consists of a set of steps that must be executed. Figure 1 shows a processing pipeline.

Figure 1. Simple processing pipeline

In a pipeline, each step is separated by a queue of fixed length. When the queue fills up, it blocks any more work from being placed in the queue. This avoids a potential runaway memory problem that can occur if there’s a large differential in the execution time between the current stage and the next.

To implement a pipelined version of your ETL workflow, you can introduce an agent that has a bounded queue that blocks when the queue is full, thus preventing any more messages from being passed to it until it has a free slot.

Listing 21. An agent with a bounded queue

Exercise 3

In listing 21, you created an agent with a different behavior to support a specific requirement. There’s a myriad of other agents you can create. Your task is to create an agent that allows users from any thread to update the UI.

In listing 21, you created two types: ReplyChannel<'a> , and BoundedAgent<'a>. ReplyChannel<'a> provides functionality identical to that of AsyncReplyChannel<'a> defined in the FSharp.Core library. But as previously mentioned, the constructor for this type is sealed so you can’t reuse it. Fortunately the System.Threading.Tasks namespace provides the TaskCompletionSource<'a> type, which allows you to wait on a result from an asynchronous task. All you do in listing 21 is provide a wrapper around this type. With this in hand, you can then implement the BoundedAgent<'a> type. At the heart of the bounded agent is BlockingCollection<'a>, which lets you wrap other collections (that provide an implementation of the IProducerConsumer-Collection<'a> interface) to add blocking behavior. That way, you can add first-in, first-out (FIFO) semantics, for example, by wrapping a concurrent queue. But because agents can process messages in any order, the simple blocking collection suffices in this case.

Now that you’ve defined your agent, you can begin to integrate it into your ETL functions. But you have a problem: instead of passing the result of one asynchronous computation to the input of the other, you have to take the reply message from one agent and post the message to the next agent. To fix this, you can introduce a function (see listing 22) that takes an agent and the previous part of the computation and converts it into string * Async<'a>, which is what the stage function defined in listing 16 expects.

Listing 22. Tying together agents

type Replyable<'request, 'reply> =

| Reply of 'request * IAsyncReplyChannel<'reply>

let pipelined (agent:AgentRef<_>) previous =

agent.Id, async {

let! result =

agent.PostAndTryAsyncReply(fun rc -> Reply(previous,rc))

match result with

| Some(result) ->

match result with

| Choice1Of2(result) -> return result

| Choice2Of2(err) -> return raise(err)

| None -> return failwithf "Stage timed out %s: failed" agent.Id

}

Listing 22 takes a reference to an agent and creates an asynchronous computation that wraps the call to PostAndTryAsyncReply. To support this, you need to define a message type for your agents, because you need a common way of responding using the reply channel you built earlier. To enable this, you create the Replyable<'request,'reply> type. This unfortunately has the side effect of restricting your agents to accept this message, but this isn’t as bad as it might seem. The type Replyable<'request,'reply> doesn’t impose any constraints on the request or reply payloads. Implementing an agent that can accept this message is simple, as shown in the following listing.

Listing 23. Implementing a pipeline-able agent

let bounded name limit comp =

BoundedAgent<_>(name, limit, fun (ref:AgentRef<_>) ->

let rec loop (ref:AgentRef<_>) =

async {

let! Reply(msg, reply) = ref.Receive()

let! result = comp msg |> Async.Catch

do reply.Reply(result)

return! loop ref

}

loop ref) |> Agent.spawn

Listing 23 creates a bounded agent that accepts the Replyable<'request, 'reply> message type. When a message is received, it’s unpacked, and the payload is passed to the computation for processing. Once you have the result of the computation, it’s passed to the reply channel. The actor is then looped to await the new message.

With all this in place, you can implement a pipelined version.

Listing 24. Pipelined file converter

let writeOutput (path,input) = async {

let! serialised =

JsonConvert.SerializeObjectAsync(input)

|> Async.AwaitTask

use fs = File.Open

( path, FileMode.Create,

FileAccess.Write, FileShare.None)

use sw = new StreamWriter(fs)

return!

sw.WriteAsync(serialised)

|> Async.AwaitIAsyncResult

|> Async.Ignore }

let fileExtractor =

Agent.bounded "extractCsv" 5 readFileAsync

let parser =

Agent.bounded "parseCsv" 5 parseCsvAsync

let saveFile =

Agent.bounded "saveFile" 5 writeOutput

let agentWorker sourcePath sinkPath =

etl {

let! extracted = Agent.pipelined fileExtractor sourcePath

let! parsedCsv = Agent.pipelined parser extracted

do! Agent.pipelined saveFile (sinkPath,parsedCsv) }

|> Etl.toAsync id (printfn "Error Stage: %s - %A")

Fundamentally you haven’t changed your ETL pipeline; you wrapped the existing functions into an agent and prefixed each of those agents with the pipelined function. This approach allows you to reuse all the functionality you already had.

Putting a system together

Earlier we focused on individual aspects of asynchronous and agent-based programming. This is all well and good, but you could focus so much on understanding these primitives that you lose sight of how they relate to real-world systems. In this section, you’ll build a scalable information system that extracts forecast and observational power data from a UK power system operator and combines it into various views.

Introducing scalable systems

Scalability refers to an improvement of a system’s overall performance by adding more resources. This improvement can be achieved by adding resources to a single piece of hardware (vertical scaling) or adding more pieces of commodity hardware (horizontal scaling). Each scaling strategy has its pros and cons. Vertical scaling is bounded by Moore’s law, and horizontal scaling can make “true” consistency difficult to achieve due to nonzero latency between nodes. How you should scale your application depends on properties that are desirable for your system so that it can meet your business requirements. This section outlines several techniques that are generally applicable and that let you build a scalable system using F#.

Building on event streams

Event streams provide a continuous set of events. Given an initial application state, you can sequentially apply the events from the stream to change the state of the application to a new state. Representing the changes to application state as a sequence of events is known as event sourcing. Event sourcing allows a system to have its state reconstructed to any point by replaying the relevant events. But what’s an event? An event is an immutable, one-way record of something that has happened. It should represent something of business significance to the application (see listing 25). For example, “Fuel generation data was extracted” could be an event in your ETL system.

Listing 25. Representing an event

type Event<'evntType, 'payload> = {

Id : Guid

EventType : 'evntType

Payload : 'payload

Timestamp : DateTimeOffset

MetaData : IDictionary<string, obj>

}

What’s the point of storing all intermediate events when you can just store the final state? Well, a few interesting features come out of storing the individual events:

· Storage —It’s trivial to store events. Because events should be immutable, all you need to do is serialize them and then append them to a file. If you need to query, consider a fully fledged event store or a relational database management system (RDBMS). If you choose an RDBMS, the data model is often far simpler than the conventional fully normalized data model.

· Error recovery —If a worker node goes down in your cluster, you replay all the events back through the node when it comes back up, to restore the state.

· Fault tolerance —Because events can be multiplexed, scaling is made a lot simpler; you distribute the workers on multiple machines connected to a message bus and broadcast events to all the workers.

· Trivial precomputation —Because an individual event completely encapsulates the potential change to the state, it’s easy to precompute a read model by applying the event and then projecting the state to the read model.

The last bullet mentions a read model. This is a term that arises from the Command Query Responsibility Separation (CQRS) pattern. CQRS and event sourcing are often mentioned together, but in no way does one imply the other. The idea behind CQRS is that reads and writes to a data store are separated (figure 2). As a simple example, a UI might create an event or a command, which is written to the event store. The event store then publishes the stored event. The read side subscribes to the stored event and applies the event to some application state (which may or may not cause a change). If the event does cause a change, the read model is updated and persisted in the read store. The read model is often some sort of denormalized store, where the model is optimized so that reads of the application can respond to queries as quickly as possible.

Figure 2. CQRS interfaces

Designing for scalability

When developing applications that need to scale, the closer you get to the hardware, the harder you have to work to get performance gains. This means for an application to scale successfully, it must be designed to do so from the beginning. For an application to have a chance at being scalable, it must do the following:

· Minimize wait times —Use asynchronous programming to avoid blocking execution paths and allow other parts of the application to continue running.

· Commute —Actions can be applied in any order and still produce the same result.

· Minimize resource contention —Processes shouldn’t fight to update a single file.

· Keep computations pure —Actions with side effects (such as persisting data) should occur only on the interfaces of the application.

· Minimize dependencies —Dependencies make applications more complex and can create bottlenecks when one side of the dependency takes significantly longer than the other to complete its task.

Combining the tenets of event sourcing and CQRS, along with implementing agents, goes a long way toward satisfying all of these properties. But a large portion of systems in the world aren’t event driven. In the next section, you’ll see how to extract data from the world and enable this event-driven approach to use asynchronous systems.

Implementing the system

The example system will be based on UK power generation. You’ll take half-hourly fuel generation data published by ELEXON (the company that manages the UK’s power-trading exchange), transform it into a type, and then compute some statistics (totals and averages) over this data. Although this isn’t a complex problem, it highlights how you can build and compose asynchronous workflows and agents to create a scalable solution to problems.

To start, in the following listing you define some primitive types that you’ll need to represent the application domain.

Listing 26. Representing your domain

In this extremely simple domain, notice that the type FuelTypeStats has some interesting members on it, including Zero and (+). The existence of these members points to the fact that the type may be monoidal[3] as long as it satisfies the three monoid laws of associativity, closure, and identity. Because this type contains only other monoidal types, your type is monoidal too. This subject is out of the scope of this chapter, but suffice to say that the fact that this type is monoidal will make it easy to compute many things over this data without having to think too hard about it.

3 Scott Wlaschin, “The ‘Understanding Monoids’ Series,” F# for Fun and Profit, http://mng.bz/n7JZ.

Now that you’ve defined your domain, you can define how you’re going to transform the data.

Listing 27. Extracting and transforming the data

Listing 27 creates an ETL pipeline that stores the result from each of the stages in an event store. To extract the data , you decide where to source the data based on the trigger type passed into the pipeline (more on this later). Once the raw data is extracted, you pass this data to the transform stage, which then uses a CSV type provider to parse and convert the data into your domain type. You again store this transformed data in the event store. The final stage is passing the transformed data to a function, which can dispatch to your waiting models .

Now that you have some data, the next thing to do is implement some models.

Listing 28. Implementing the models

In this application, the model computations live within agents. You can abstract most of the implementation of an agent away into a single function . This function takes a name, an event type, a cancellation token, and a computation. In terms of business relevance for this application, it’s this computation that’s important. As explained earlier, because your type implements certain members, you can use built-in F# functions (Seq.sum and Seq.average) to handle the computation. Because FuelStatsType is in fact a monoid, the statistics functions used in listing 28 would be almost trivial to define if they didn’t exist in the F# core library (see listing 29).

Listing 29. Sum and average without Seq.sum and Seq.average

Notice when computing the average that there is no divisor. Instead you use the DivideByInt member on FuelStatsType and a fold to count and sum the individual instances of FuelStatsType, giving the equivalent of Seq.average.

Next you need a way to extract and transform the raw data to a representation and a way to model the data. The next listing shows how to create triggers to kick off this process.

Listing 30. Triggering your workflow

module Triggers =

open System

open System.IO

open FSharpDeepDives

open FSharpDeepDives.ExampleApp

let private ensurePath path =

let di = new DirectoryInfo(path)

di.Create(); di.FullName

let file path =

let watcher = new FileSystemWatcher(ensurePath path)

watcher.EnableRaisingEvents <- true

watcher.Created

|> Observable.map (fun x -> Trigger.File(x.FullPath))

let cron cronExp =

Schedule.toObservable cronExp

|> Observable.map (fun _ -> Trigger.Cron)

Listing 30 creates two triggers: a file trigger that watches a folder for any changes, and a cron-based trigger that fires at intervals that match the cron expression. When each trigger is fired, you map the event arguments in the Trigger type defined in listing 26.

The only thing left to do is to tie all this together, as shown in the next listing. This will be the entry point to your application.

Listing 31. Bringing it all together

Notice the extensibility you’ve achieved here. For example, if you want another statistic computed, you can write the function, wrap it in an agent, and add it to the list of model definitions . Or if you want another trigger, you can add it to the list . It shouldn’t be hard to see how you could take this application even further and make it almost entirely data driven, by lifting the lists defined in and into configuration files.

Going beyond simple agents

Everything you’ve seen so far has been in terms of agents and asynchronous programming in a single process. This needn’t be the case; the message passing–based nature of agents makes them almost trivial to extend into message bus–oriented architecture. For example, you could create an agent where the receive method is listening to an exchange on a message bus and the post places messages onto another exchange to which other agent instances subscribe. This opens up a world of possibilities in terms of load balancing, clustering, and resilience.

Summary

At the start of this chapter, we introduced F# asynchronous workflows and showed you how to compose these workflows together into larger, more complex workflows with ease. You then learned about agents and built a framework that helps you manage agents in large systems. From there you moved on to building extract-transform and load pipelines; you started with a simple example and built it up in stages until you had a fully fledged pipeline that allows you to balance the amount of throughput to get the best trade-off in terms of memory usage and performance. You also saw how to schedule pipelines to run using cron expressions. Finally, you built an application that extracts UK fuel-generation data and loads the results into a set of models that compute statistics over that data and store each action and computation result as an event.

About the author

Colin Bull holds a master’s degree in physics from the University of Birmingham and is currently working as a consultant software developer/technical architect in the UK commodity trading sector, where he has implemented several commercial solutions in F#.

In his spare time, Colin enjoys learning about new technologies, especially ones related to functional programming, and contributes to several F# community open source projects. Colin also maintains a blog at http://colinbul.wordpress.com.