Rx Basics - Concurrency in C# Cookbook (2014)

Concurrency in C# Cookbook (2014)

Chapter 5. Rx Basics

LINQ is a set of language features that enable developers to query sequences. The two most common LINQ providers are the built-in LINQ to Objects (based on IEnumerable<T>) and LINQ to Entities (based on IQueryable<T>). There are many other providers available, and most providers have the same general structure. Queries are lazily evaluated, and the sequences produce values as necessary. Conceptually, this is a pull model; during evaluation, value items are pulled from the query one at a time.

Reactive Extensions (Rx) treats events as sequences of data that arrive over time. As such, you can think of Rx as LINQ to events (based on IObservable<T>). The main difference between observables and other LINQ providers is that Rx is a “push” model. This means that the query defines how the program reacts as events arrive. Rx builds on top of LINQ, adding some powerful new operators as extension methods.

In this chapter, we’ll look at some of the more common Rx operations. Bear in mind that all of the LINQ operators are also available, so simple operations, such as filtering (Where) and projection (Select), work conceptually the same as they do with any other LINQ provider. This chapter will not cover these common LINQ operations; it focuses on the new capabilities that Rx builds on top of LINQ, particularly those dealing with time.

To use Rx, install the NuGet package Rx-Main into your application. Reactive Extensions has wide platform support (Table 5-1):

Table 5-1. Platform support for Reactive Extensions

Platform

Rx support

.NET 4.5

Yes

.NET 4.0

Yes

Mono iOS/Droid

Yes

Windows Store

Yes

Windows Phone Apps 8.1

Yes

Windows Phone SL 8.0

Yes

Windows Phone SL 7.1

Yes

Silverlight 5

Yes

5.1. Converting .NET Events

Problem

You have an event that you need to treat as an Rx input stream, producing some data via OnNext each time the event is raised.

Solution

The Observable class defines several event converters. Most .NET framework events are compatible with FromEventPattern, but if you have events that don’t follow the common pattern, you can use FromEvent instead.

FromEventPattern works best if the event delegate type is EventHandler<T>. Many newer framework types use this delegate type for events. For example, the Progress<T> type defines a ProgressChanged event, which is of type EventHandler<T>, so it can be easily wrapped with FromEventPattern:

var progress = new Progress<int>();

var progressReports = Observable.FromEventPattern<int>(

handler => progress.ProgressChanged += handler,

handler => progress.ProgressChanged -= handler);

progressReports.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs));

Note that the data.EventArgs is strongly typed to be an int. The type argument to FromEventPattern (int in the previous example) is the same as the type T in EventHandler<T>. The two lambda arguments to FromEventPattern enable Rx to subscribe and unsubscribe from the event.

Newer UI frameworks use EventHandler<T> and can easily be used with FromEventPattern, but older types often define a unique delegate type for each event. These can also be used with FromEventPattern, but it takes a bit more work. For example, the System.Timers.Timertype defines an Elapsed event, which is of type ElapsedEventHandler. You can wrap older events like this with FromEventPattern as such:

var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };

var ticks = Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>(

handler => (s, a) => handler(s, a),

handler => timer.Elapsed += handler,

handler => timer.Elapsed -= handler);

ticks.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs.SignalTime));

Note that data.EventArgs is still strongly typed. The type arguments to FromEventPattern are now the unique handler type and the derived EventArgs type. The first lambda argument to FromEventPattern is a converter from EventHandler<ElapsedEventArgs> toElapsedEventHandler; the converter should do nothing more than pass along the event.

That syntax is definitely getting awkward. There is another option, which uses reflection:

var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };

var ticks = Observable.FromEventPattern(timer, "Elapsed");

ticks.Subscribe(data => Trace.WriteLine("OnNext: "

+ ((ElapsedEventArgs)data.EventArgs).SignalTime));

With this approach, the call to FromEventPattern is much easier. However, there are some drawbacks to this approach: there is a magic string ("Elapsed"), and the consumer does not get strongly typed data. That is, data.EventArgs is of type object, so you have to cast it toElapsedEventArgs yourself.

Discussion

Events are a common source of data for Rx streams. This recipe covers wrapping any events that conform to the standard event pattern (where the first argument is the sender and the second argument is the event arguments type). If you have unusual event types, you can still use theObservable.FromEvent method overloads to wrap them into an observable.

When events are wrapped into an observable, OnNext is called each time the event is raised. This can cause surprising behavior when you’re dealing with AsyncCompletedEventArgs, because any exception is passed along as data (OnNext), not as an error (OnError). Consider this example wrapper for WebClient.DownloadStringCompleted:

var client = new WebClient();

var downloadedStrings = Observable.FromEventPattern(client,

"DownloadStringCompleted");

downloadedStrings.Subscribe(

data =>

{

var eventArgs = (DownloadStringCompletedEventArgs)data.EventArgs;

if (eventArgs.Error != null)

Trace.WriteLine("OnNext: (Error) " + eventArgs.Error);

else

Trace.WriteLine("OnNext: " + eventArgs.Result);

},

ex => Trace.WriteLine("OnError: " + ex.ToString()),

() => Trace.WriteLine("OnCompleted"));

client.DownloadStringAsync(new Uri("http://invalid.example.com/"));

When WebClient.DownloadStringAsync completes with an error, the event is raised with an exception in AsyncCompletedEventArgs.Error. Unfortunately, Rx sees this as a data event, so if you run this you’ll see “OnNext: (Error)” printed instead of “OnError:.”

Some event subscriptions and unsubscriptions must be done from a particular context. For example, events on many UI controls must be subscribed to from the UI thread. Rx provides an operator will control the context for subscribing and unsubscribing: SubscribeOn. This operator that is not necessary in most situations because most of the time a UI-based subscription is done from the UI thread.

See Also

Recipe 5.2 covers how to change the context in which events are raised.

Recipe 5.4 covers how to throttle events so subscribers are not overwhelmed.

5.2. Sending Notifications to a Context

Problem

Rx does its best to be thread agnostic. So, it will raise its notifications (e.g., OnNext) in whatever thread happens to be present at the time.

However, you often want these notifications raised in a particular context. For example, UI elements should only be manipulated from the UI thread that owns them, so if you are updating a UI in response to a notification, then you’ll need to “move” over to the UI thread.

Solution

Rx provides the ObserveOn operator to move notifications to another scheduler.

Consider this example, which uses the Interval operator to create OnNext notifications once a second:

private void Button_Click(object sender, RoutedEventArgs e)

{

Trace.WriteLine("UI thread is " + Environment.CurrentManagedThreadId);

Observable.Interval(TimeSpan.FromSeconds(1))

.Subscribe(x => Trace.WriteLine("Interval " + x + " on thread " +

Environment.CurrentManagedThreadId));

}

On my machine, the output looks like this:

UI thread is 9

Interval 0 on thread 10

Interval 1 on thread 10

Interval 2 on thread 11

Interval 3 on thread 11

Interval 4 on thread 10

Interval 5 on thread 11

Interval 6 on thread 11

Since Interval is based on a timer (without a specific thread), the notifications are raised on a thread-pool thread, rather than the UI thread. If we need to update a UI element, we can pipe those notifications through ObserveOn and pass a synchronization context representing the UI thread:

private void Button_Click(object sender, RoutedEventArgs e)

{

var uiContext = SynchronizationContext.Current;

Trace.WriteLine("UI thread is " + Environment.CurrentManagedThreadId);

Observable.Interval(TimeSpan.FromSeconds(1))

.ObserveOn(uiContext)

.Subscribe(x => Trace.WriteLine("Interval " + x + " on thread " +

Environment.CurrentManagedThreadId));

}

Another common usage of ObserveOn is to move off the UI thread when necessary. Let’s say we have a situation where we need to do some CPU-intensive computation whenever the mouse moves. By default, all mouse move events are raised on the UI thread, so we can use ObserveOn to move those notifications to a thread-pool thread, do the computation, and then move the result notifications back to the UI thread:

private void Button_Click(object sender, RoutedEventArgs e)

{

var uiContext = SynchronizationContext.Current;

Trace.WriteLine("UI thread is " + Environment.CurrentManagedThreadId);

Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(

handler => (s, a) => handler(s, a),

handler => MouseMove += handler,

handler => MouseMove -= handler)

.Select(evt => evt.EventArgs.GetPosition(this))

.ObserveOn(Scheduler.Default)

.Select(position =>

{

// Complex calculation

Thread.Sleep(100);

var result = position.X + position.Y;

Trace.WriteLine("Calculated result " + result + " on thread " +

Environment.CurrentManagedThreadId);

return result;

})

.ObserveOn(uiContext)

.Subscribe(x => Trace.WriteLine("Result " + x + " on thread " +

Environment.CurrentManagedThreadId));

}

If you execute this sample, you’ll see the calculations done on a thread-pool thread and the results printed on the UI thread. However, you’ll also notice that the calculations and results will lag behind the input; they’ll queue up because the mouse move updates more often than every 100 ms. Rx has several techniques for handling this situation; one common one covered in Recipe 5.4 is throttling the input.

Discussion

ObserveOn actually moves notifications to an Rx scheduler. This recipe covered the default (thread pool) scheduler and one way of creating a UI scheduler. The most common uses for the ObserveOn operator are moving on or off the UI thread, but schedulers are useful in other scenarios. We’ll take another look at schedulers when we do some advanced testing in Recipe 6.6.

See Also

Recipe 5.1 covers how to create sequences from events.

Recipe 5.4 covers throttling event streams.

Recipe 6.6 covers the special scheduler used for testing your Rx code.

5.3. Grouping Event Data with Windows and Buffers

Problem

You have a sequence of events and you want to group the incoming events as they arrive. For one example, you need to react to pairs of inputs. For another example, you need to react to all inputs within a two-second window.

Solution

Rx provides a pair of operators that group incoming sequences: Buffer and Window. Buffer will hold on to the incoming events until the group is complete, at which time it forwards them all at once as a collection of events. Window will logically group the incoming events but will pass them along as they arrive. The return type of Buffer is IObservable<IList<T>> (an event stream of collections); the return type of Window is IObservable<IObservable<T>> (an event stream of event streams).

This example uses the Interval operator to create OnNext notifications once a second and then buffers them two at a time:

private void Button_Click(object sender, RoutedEventArgs e)

{

Observable.Interval(TimeSpan.FromSeconds(1))

.Buffer(2)

.Subscribe(x => Trace.WriteLine(

DateTime.Now.Second + ": Got " + x[0] + " and " + x[1]));

}

On my machine, this produces a pair of outputs every two seconds:

13: Got 0 and 1

15: Got 2 and 3

17: Got 4 and 5

19: Got 6 and 7

21: Got 8 and 9

The following is a similar example using Window to create groups of two events:

private void Button_Click(object sender, RoutedEventArgs e)

{

Observable.Interval(TimeSpan.FromSeconds(1))

.Window(2)

.Subscribe(group =>

{

Trace.WriteLine(DateTime.Now.Second + ": Starting new group");

group.Subscribe(

x => Trace.WriteLine(DateTime.Now.Second + ": Saw " + x),

() => Trace.WriteLine(DateTime.Now.Second + ": Ending group"));

});

}

On my machine, this Window example produces output like this:

17: Starting new group

18: Saw 0

19: Saw 1

19: Ending group

19: Starting new group

20: Saw 2

21: Saw 3

21: Ending group

21: Starting new group

22: Saw 4

23: Saw 5

23: Ending group

23: Starting new group

These examples illustrate the difference between Buffer and Window. Buffer waits for all the events in its group and then publishes a single collection. Window groups events the same way, but publishes the events as they come in.

Both Buffer and Window also work with time spans. This is an example where all mouse move events are collected in windows of one second:

private void Button_Click(object sender, RoutedEventArgs e)

{

Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(

handler => (s, a) => handler(s, a),

handler => MouseMove += handler,

handler => MouseMove -= handler)

.Buffer(TimeSpan.FromSeconds(1))

.Subscribe(x => Trace.WriteLine(

DateTime.Now.Second + ": Saw " + x.Count + " items."));

}

Depending on how you move the mouse, you should see output like this:

49: Saw 93 items.

50: Saw 98 items.

51: Saw 39 items.

52: Saw 0 items.

53: Saw 4 items.

54: Saw 0 items.

55: Saw 58 items.

Discussion

Buffer and Window are some of the tools we have for taming input and shaping it the way we want it to look. Another useful technique is throttling, which we’ll look at in Recipe 5.4.

Both Buffer and Window have other overloads that can be used in more advanced scenarios. The overloads with skip and timeShift parameters allow you to create groups that overlap other groups or skip elements in between groups. There are also overloads that take delegates, which allow you to dynamically define the boundary of the groups.

See Also

Recipe 5.1 covers how to create sequences from events.

Recipe 5.4 covers throttling event streams.

5.4. Taming Event Streams with Throttling and Sampling

Problem

A common problem with writing reactive code is when the events come in too quickly. A fast-moving stream of events can overwhelm your program’s processing.

Solution

Rx provides operators specifically for dealing with a flood of event data. The Throttle and Sample operators give us two different ways to tame fast input events.

The Throttle operator establishes a sliding timeout window. When an incoming event arrives, it resets the timeout window. When the timeout window expires, it publishes the last event value that arrived within the window.

This example monitors mouse movements but uses Throttle to only report updates once the mouse has stayed still for a full second:

private void Button_Click(object sender, RoutedEventArgs e)

{

Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(

handler => (s, a) => handler(s, a),

handler => MouseMove += handler,

handler => MouseMove -= handler)

.Select(x => x.EventArgs.GetPosition(this))

.Throttle(TimeSpan.FromSeconds(1))

.Subscribe(x => Trace.WriteLine(

DateTime.Now.Second + ": Saw " + (x.X + x.Y)));

}

The output varies considerably based on mouse movement, but one example run on my machine looked like this:

47: Saw 139

49: Saw 137

51: Saw 424

56: Saw 226

Throttle is often used in situations such as autocomplete, when the user is typing text into a textbox, but you don’t want to do the actual lookup until the user stops typing.

Sample takes a different approach to taming fast-moving sequences. Sample establishes a regular timeout period and publishes the most recent value within that window each time the timeout expires. If there were no values received within the sample period, then no results are published for that period.

The following example captures mouse movements and samples them in one-second intervals. Unlike the Throttle example, the Sample example does not require you to hold the mouse still to see data.

private void Button_Click(object sender, RoutedEventArgs e)

{

Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(

handler => (s, a) => handler(s, a),

handler => MouseMove += handler,

handler => MouseMove -= handler)

.Select(x => x.EventArgs.GetPosition(this))

.Sample(TimeSpan.FromSeconds(1))

.Subscribe(x => Trace.WriteLine(

DateTime.Now.Second + ": Saw " + (x.X + x.Y)));

}

Here’s the output on my machine when I first left the mouse still for a few seconds and then continuously moved it:

12: Saw 311

17: Saw 254

18: Saw 269

19: Saw 342

20: Saw 224

21: Saw 277

Discussion

Throttling and sampling are essential tools for taming the flood of input. Don’t forget that you can also easily do filtering with the standard LINQ Where operator. You can think of the Throttle and Sample operators as similar to Where, only they filter on time windows instead of filtering on event data. All three of these operators help you tame fast-moving input streams in different ways.

See Also

Recipe 5.1 covers how to create sequences from events.

Recipe 5.2 covers how to change the context in which events are raised.

5.5. Timeouts

Problem

You expect an event to arrive within a certain time and need to ensure that your program will respond in a timely fashion, even if the event does not arrive. Most commonly, this kind of expected event is a single asynchronous operation (e.g., expecting the response from a web service request).

Solution

The Timeout operator establishes a sliding timeout window on its input stream. Whenever a new event arrives, the timeout window is reset. If the timeout expires without seeing an event in that window, the Timeout operator will end the stream with an OnError notification containing aTimeoutException.

This example issues a web request for the example domain and applies a timeout of one second:

private void Button_Click(object sender, RoutedEventArgs e)

{

var client = new HttpClient();

client.GetStringAsync("http://www.example.com/").ToObservable()

.Timeout(TimeSpan.FromSeconds(1))

.Subscribe(

x => Trace.WriteLine(DateTime.Now.Second + ": Saw " + x.Length),

ex => Trace.WriteLine(ex));

}

Timeout is ideal for asynchronous operations, such as web requests, but it can be applied to any event stream. The following example applies Timeout to mouse move events, which are easier to play around with:

private void Button_Click(object sender, RoutedEventArgs e)

{

Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(

handler => (s, a) => handler(s, a),

handler => MouseMove += handler,

handler => MouseMove -= handler)

.Select(x => x.EventArgs.GetPosition(this))

.Timeout(TimeSpan.FromSeconds(1))

.Subscribe(

x => Trace.WriteLine(DateTime.Now.Second + ": Saw " + (x.X + x.Y)),

ex => Trace.WriteLine(ex));

}

On my machine, I moved the mouse a bit and then let it sit still for a second, and got these results:

16: Saw 180

16: Saw 178

16: Saw 177

16: Saw 176

System.TimeoutException: The operation has timed out.

Note that once the TimeoutException is sent to OnError, the stream is finished. No more mouse move events come through. You may not want exactly this behavior, so the Timeout operator has overloads that substitute a second stream when the timeout occurs instead of ending the stream with an exception.

This example observes mouse moves until there is a timeout and then switches to observing mouse clicks:

private void Button_Click(object sender, RoutedEventArgs e)

{

var clicks = Observable.FromEventPattern

<MouseButtonEventHandler, MouseButtonEventArgs>(

handler => (s, a) => handler(s, a),

handler => MouseDown += handler,

handler => MouseDown -= handler)

.Select(x => x.EventArgs.GetPosition(this));

Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(

handler => (s, a) => handler(s, a),

handler => MouseMove += handler,

handler => MouseMove -= handler)

.Select(x => x.EventArgs.GetPosition(this))

.Timeout(TimeSpan.FromSeconds(1), clicks)

.Subscribe(

x => Trace.WriteLine(

DateTime.Now.Second + ": Saw " + x.X + "," + x.Y),

ex => Trace.WriteLine(ex));

}

On my machine, I moved the mouse a bit, then held it still for a second, and then clicked on a couple different points. The output is below, showing the mouse-move events quickly moving through until the timout, and then the two click events:

49: Saw 95,39

49: Saw 94,39

49: Saw 94,38

49: Saw 94,37

53: Saw 130,141

55: Saw 469,4

Discussion

Timeout is an essential operator in nontrivial applications because you always want your program to be responsive even if the rest of the world is not. It’s particularly useful when you have asynchronous operations, but it can be applied to any event stream. Note that the underlying operation is not actually canceled; in the case of a timeout, the operation will continue executing until it succeeds or fails.

See Also

Recipe 5.1 covers how to create sequences from events.

Recipe 7.6 covers wrapping asynchronous code as an observable event stream.

Recipe 9.6 covers unsubscribing from sequences as a result of a CancellationToken.

Recipe 9.3 covers using a CancellationToken as a timeout.