Single-Process Pub/Sub - Examples - Developing Web Apps with Haskell and Yesod, Second Edition (2015)

Developing Web Apps with Haskell and Yesod, Second Edition (2015)

Part III. Examples

Chapter 27. Single-Process Pub/Sub

The example in the previous chapter was admittedly quite simple. Let’s build on that foundation (pun intended) to do something a bit more interesting. Suppose we have a workflow on our site like the following:

1. Enter some information on page X, and submit.

2. Submission starts a background job, and the user is redirected to a page to view the status of that job.

3. That second page will subscribe to updates from the background job and display them to the user.

The core principle here is the ability to let one thread publish updates, and have another thread subscribe to receive those updates. This is known generally as pub/sub, and fortunately is very easy to achieve in Haskell via STM (the Software Transactional Memory library).

Like in the previous chapter, let me start off with the following caveat: this technique only works properly if you have a single web application process. If you have two different servers and a load balancer, you’ll either need sticky sessions or some other solution to make sure that the requests from a single user are going to the same machine. In those situations, you may want to consider using an external pub/sub solution, such as Redis.

With that caveat out of the way, let’s get started.

Foundation Data Type

We’ll need two different mutable references in our foundation. The first will keep track of the next “job ID” we’ll hand out. Each of these background jobs will be represented by a unique identifier that will be used in our URLs. The second piece of data will be a map from the job ID to the broadcast channel used for publishing updates. In code:

dataApp=App

{ jobs ::TVar (IntMap (TChan (MaybeText)))

, nextJob ::TVarInt

}

Notice that our TChan contains Maybe Text values. The reason for the Maybe wrapper is so that we can indicate that the channel is complete, by providing a Nothing value.

Allocate a Job

In order to allocate a job, we need to:

1. Get a job ID.

2. Create a new broadcast channel.

3. Add the channel to the channel map.

Due to the beauty of STM, this is pretty easy:

(jobId, chan) <-liftIO $ atomically $ do

jobId <-readTVar nextJob

writeTVar nextJob $! jobId + 1

chan <-newBroadcastTChan

m <-readTVar jobs

writeTVar jobs $ IntMap.insert jobId chan m

return (jobId, chan)

Fork Our Background Job

There are many different ways we could go about this, and they depend entirely on what the background job is going to be. Here’s a minimal example of a background job that prints out a few messages, with a one-second delay between each message. Note how after our final message, we broadcast a Nothing value and remove our channel from the map of channels:

liftIO $ forkIO $ do

threadDelay 1000000

atomically $ writeTChan chan $ Just "Did something\n"

threadDelay 1000000

atomically $ writeTChan chan $ Just "Did something else\n"

threadDelay 1000000

atomically $ do

writeTChan chan $ Just "All done\n"

writeTChan chan Nothing

m <-readTVar jobs

writeTVar jobs $ IntMap.delete jobId m

View Progress

For this demonstration, I’ve elected for a very simple progress viewing: a plain text page with a streaming response. There are a few other possibilities here: an HTML page that autorefreshes every X seconds, or using EventSource or WebSockets. I encourage you to give those a shot also, but here’s the simplest implementation I can think of:

getViewProgressR jobId =do

App {..} <-getYesod

mchan <-liftIO $ atomically $ do

m <-readTVar jobs

caseIntMap.lookup jobId m of

Nothing->return Nothing

Just chan ->fmap Just $ dupTChan chan

case mchan of

Nothing->notFound

Just chan ->respondSource typePlain $ do

let loop =do

mtext <-liftIO $ atomically $ readTChan chan

case mtext of

Nothing->return ()

Just text ->do

sendChunkText text

sendFlush

loop

loop

We start off by looking up the channel in the map. If we can’t find it, it means the job either never existed, or has already been completed. In either event, we return a 404. (Another possible enhancement would be to store some information on all previously completed jobs and let the user know if the job is done.)

Assuming the channel exists, we use respondSource to start a streaming response. We then repeatedly call readTChan until we get a Nothing value, at which point we exit (via return ()). Notice that on each iteration, we call both sendChunkText and sendFlush. Without that second call, the user won’t receive any updates until the output buffer completely fills up, which is not acceptable for a real-time update system.

Complete Application

For completeness, here’s the full source code for this application:

{-# LANGUAGE OverloadedStrings #-}

{-# LANGUAGE QuasiQuotes #-}

{-# LANGUAGE RecordWildCards #-}

{-# LANGUAGE TemplateHaskell #-}

{-# LANGUAGE TypeFamilies #-}

{-# LANGUAGE ViewPatterns #-}

import Control.Concurrent (forkIO, threadDelay)

import Control.Concurrent.STM

import Data.IntMap (IntMap)

importqualifiedData.IntMap as IntMap

import Data.Text (Text)

import Yesod

dataApp=App

{ jobs ::TVar (IntMap (TChan (MaybeText)))

, nextJob ::TVarInt

}

mkYesod "App" [parseRoutes|

/ HomeRGETPOST

/view-progress/#IntViewProgressRGET

|]

instanceYesodApp

getHomeR ::HandlerHtml

getHomeR =defaultLayout $ do

setTitle "PubSub example"

[whamlet|

<form method=post>

<button>Start new background job

|]

postHomeR ::Handler ()

postHomeR =do

App {..} <-getYesod

(jobId, chan) <-liftIO $ atomically $ do

jobId <-readTVar nextJob

writeTVar nextJob $! jobId + 1

chan <-newBroadcastTChan

m <-readTVar jobs

writeTVar jobs $ IntMap.insert jobId chan m

return (jobId, chan)

liftIO $ forkIO $ do

threadDelay 1000000

atomically $ writeTChan chan $ Just "Did something\n"

threadDelay 1000000

atomically $ writeTChan chan $ Just "Did something else\n"

threadDelay 1000000

atomically $ do

writeTChan chan $ Just "All done\n"

writeTChan chan Nothing

m <-readTVar jobs

writeTVar jobs $ IntMap.delete jobId m

redirect $ ViewProgressR jobId

getViewProgressR ::Int->HandlerTypedContent

getViewProgressR jobId =do

App {..} <-getYesod

mchan <-liftIO $ atomically $ do

m <-readTVar jobs

caseIntMap.lookup jobId m of

Nothing->return Nothing

Just chan ->fmap Just $ dupTChan chan

case mchan of

Nothing->notFound

Just chan ->respondSource typePlain $ do

let loop =do

mtext <-liftIO $ atomically $ readTChan chan

case mtext of

Nothing->return ()

Just text ->do

sendChunkText text

sendFlush

loop

loop

main ::IO ()

main =do

jobs <-newTVarIO IntMap.empty

nextJob <-newTVarIO 1

warp 3000 App {..}