Taming Streaming Data with Rx.NET - Thinking in LINQ: Harnessing the power of functional programing in .NET applications (2014)

Thinking in LINQ: Harnessing the power of functional programing in .NET applications (2014)

APPENDIX B. Taming Streaming Data with Rx.NET

Developers now have access to more streaming data than ever before. Much of this data originates from sensors connected to the Internet. These sensors continually post the data they collect.

For example, a high-profile gym asks users to swipe their smart membership cards at an RFID reader placed at the entrance. As soon as members do this, an event fires (assume the name of the event is MemberEnteredGym). Subscribers can register for the event, and are notified immediately. However, that’s nothing new. What’s new is that this event will be fired every time a member swipes his or her card. And there may be thousands or millions of members around the world, each generating an event each time they enter their gym. Suddenly, you have a pool of events, or an event stream.

There are two inherent characteristics of a .NET event. First, events don’t offer composability. If you have a stream of MemberEnteredGym events and you want to filter that stream for unauthenticated or fraudulent accesses, your only option is to write the code inside the event handler of a particular subscriber. Unfortunately, that means creating composable solutions using LINQ is not possible.

Moreover, .NET events aren’t first-class citizens in the .NET ecosystem. For example, you can pass an integer around to functions. You can even pass a function around to other functions (provided you declared it with Func<> as a variable). But you can’t pass around a .NET event, such as a MouseMove event from a Windows Forms class.

When you move your mouse over a form, the form generates MouseMove events. However, if you create a traditional event, you can’t pass around MouseMove event arguments that hold data about where the mouse has been so far.

Rx.NET is a framework that offers the ability to translate such events to an Observable collection of event arguments that interested parties can subscribe to. Rx.NET piggybanks on the composable nature of LINQ and is built on a push-based architecture rather than the pull-based concepts of IEnumerable.

Push and pull have an excellent real-life use-case analogy these days. For example, assume you go to a busy restaurant for your Sunday brunch. You place your order and then wait for your meal to appear. After a while, when your order still hasn’t shown up, you get a little restless. Perhaps you inquire about its status at the counter or ask your waiter. Asking is the same as polling, or pulling. On the other hand, some restaurants now give their customers an RFID-enabled vibrator when they place their orders. When the order is ready for pickup, the device receives a signal. That way, users know that their order is ready for pickup. This is a pushing scheme. The source (in this case the restaurant) is notifying the targets (the people who placed orders).

Figure B-1 captures this expression nicely in general terms.

image

Figure B-1. The intent of push and pull operations in a .NET eventing context

In the Interactive model, shown in Figure B-2, the consumers of the enumerations ask for the next element available by calling MoveNext() on the iterator. However, in the reactive world, the source (also known as the Observable) returns the value as it is generated by callingOnNext(). The Reactive model also has two more methods, called OnError() and OnCompleted(). These methods fire when an error occurs (like exceptions) or when the sequence doesn’t have any more elements.

image

Figure B-2. Bart De Smet’s depiction of the Interactive and Reactive paradigm

Figure B-3 shows a comparison between using IObserver and IEnumerator.

image

Figure B-3. Side-by-side comparison of the main interfaces of Reactive and Interactive paradigm

A Brief Explanation of the Interfaces

IEnumerator is the basis of a pull-based world, where client code polls for the next element by calling the enumerator’s MoveNext() method. Any of three conditions can occur during this activity. First, if everything works as expected, the client code will obtain the next element T afterMoveNext(), by evaluating Current. Second, there may be no more items in the collection to be iterated. This is same as successful completion of the iteration, and the code must handle that condition. Third, an error can occur during this process—again, the client code must handle that condition.

Put another way, calling MoveNext can result in a T, a void signaling completion, or an Exception signaling an error. These three situations are mirrored in a push-based world by the OnNext(), OnCompleted(), and OnError() methods of the IObserver<> interface. There is a good video by Bart De Smet at http://channel9.msdn.com/Shows/Going+Deep/Bart-De-Smet-Observations-on-IQbservable-The-Dual-of-IQueryable that explains these concepts. I recommend you watch that video.

In the rest of this chapter, you’ll see how to use Rx.NET to tame oncoming event streams. The Observable class has a number of extension methods for processing the event stream. Some are ported from the IEnumerable class but have been made usable for Observables as extension methods. For example, Range() is one such extension method. Some of these methods are also explained in this chapter.

Getting Rx.NET

You can download Rx.NET from http://msdn.microsoft.com/en-in/data/gg577610.

Rx.NET is being made available for several platforms, including Windows Phone.

Using Rx.NET in LINQPad

LINQPad is the one of the coolest tools out there for .NET developers. We are all grateful to Joseph Albahari for creating it. One special LINQPad feature can even help you understand Rx.NET better.

To configure LINQPad to use Rx.NET, you need to add references for the following assemblies:

·     System.Reactive.Core.dll

·     System.Reactive.Interfaces.dll

·     System.Reactive.Linq.dll

Image Note  Every example in this chapter uses version 2.2.5 of all these assemblies. By the time this book is published, there may be another stable version available—-but it’s also possible that some of the methods I’ve used might be deprecated. So to follow the examples, I recommend getting version 2.2.5. For your own experiments, you should download any more recent versions.

Press F4 in LINQPad and you’ll see the dialog box to add the DLLs. After adding them, the list should look like Figure B-4. Make sure to click the Set as Default for New Queries button.

image

Figure B-4. The list of Rx.NET references in LINQPad

This will make sure that you get Rx.NET extension methods and classes on all the new LINQPad tabs that you open.

Next, click the Additional Namespace Imports tab and add the following namespaces, as shown in Figure B-5.

image

Figure B-5. Showing additional namespaces to include for using Rx.NET

Click the Set as Default for New Queries” button in this tab as well.

Finally, click OK to save your settings and exit.

Now that you are ready to use LINQPad with Rx.NET, it’s a good time to mention the special feature that LINQPad offers. You can use LINQPad’s Dump() method to dump Observables to the output. As long as they are being evaluated, the Dump() result header will remain green; however, when the evaluation completes—in other words, when the Observable raises the OnCompleted() event—the Dump() result header turns blue (green and blue are LINQPad’s default colors). I found this transition to be extremely helpful while running Rx.NET queries.

Also, you can terminate evaluation of any long-running Observable by pressing Ctrl+Shift+F5. This message also appears at the bottom right of LINQPad.

Now use the following query to make sure you are ready to run Rx queries in LINQPad.

Open a new LINQPad window and paste in the following query. Change the Language drop-down value to C# Statements.

Observable.Interval(TimeSpan.FromSeconds(1))
.Take(10)
.Dump("Slow stream");

Observable.Interval(TimeSpan.FromSeconds(0.5))
.Take(10)
.Dump("Fast stream");

This should generate the outputs shown in Figure B-6. I’ve shown only three here, all captured at different times: the first dump, one in the middle, and one at the end. The operation is best captured through a video, so I recorded my result and posted it athttps://www.youtube.com/watch?v=mYinKlHET5s&feature=youtu.be.

image

Figure B-6. Special feature of LINQPad

Creating Observables and Subscribing

The Observable class offers several extension methods to create Observables. It’s unlikely that you would ever need to implement the interfaces to create Observables in your own class.

Here are some of the extension methods used most frequently to generate Observables:

·     Range

·     Repeat

·     Never

·     Throw (throws an Exception)

·     Interval

·     Generate

·     ToObservable

The following sections describe how to use these methods to generate and subscribe to Observables.

Range

The Range() method creates an Observable with values in the given range. This is conceptually the same as creating a range with an Enumerable class.

Image Note  Set the Language drop-down to C# Statement for all the code in this chapter unless instructed otherwise.

Add the following code in a new LINQPad tab:

var range  = Observable.Range(1,10);
range.Take(4).Dump();

When you run the query, it generates the output shown in Figure B-7.

image

Figure B-7. Showing first four values of the Observable collection created with Range()

Notice that the type of the output is IObservable<int>.

Repeat

The Repeat method does just what you probably expect—it repeats the given value. For example, if you want an infinite Observable of the value 42, you can write it like this:

Observable.Repeat(42).Dump();

Because this will run as long as you want it to, you need to resort to a deferred execution model, which you can do by plugging a Take() call at the end, like this:

Observable.Repeat(42).Take(10).Dump();

This will return an Observable with 42 repeated ten times.

Never

The Never method is almost the same as the Enumerable.Empty method. Empty produces an empty Enumerable, while Never produces an empty Observable. It will never produce any value. Never can’t determine the type of the Observable, so you must provide the value as a generic type, as shown here:

Observable.Never<int>().Dump();

Interval

The Interval() method lets you create an Observable that produces an element at a given frequency. You can provide the frequency as a Timespan. Here’s an example:

var times = Observable.Interval(TimeSpan.FromSeconds(2)).Take(10);
times.Dump();

This generates ten values—one every 2 seconds.

Generate

Using Generate, you can generate arbitrary Observable collections. The method takes four arguments:

·     A seed value.

·     A functor that returns a Boolean and determines how long the Observable should run. Generate continues to generate data as long as this returns true.

·     A delegate that tells Generate how to update the seed value to the next one.

·     A projection strategy—how you want to project the data.

Here is an example showing how to use Generate to generate Fibonacci numbers:

KeyValuePair<int,int> seed = new KeyValuePair<int,int>(0,1);

Observable.Generate(
//Start with this seed value
seed
// Run it eternally
,x=>true
//Here is how to step through to go to the next one
,x => new KeyValuePair<int,int>(x.Value,x.Key+x.Value)
//Return the “Key” of the key value pair.
,x => x.Key)
.Take(10)
.Dump("First 10 Fibonacci numbers");

This is one of the best implementations of generating Fibonacci numbers because it uses only one KeyValuePair to store the last-calculated Fibonacci number, which is a very effective way to implement memoization. Observables don’t have to remember the values that have been generated so far.

You can see the output in Figure B-8.

image

Figure B-8. Fibonacci numbers calculated using the Generate method

An earlier version of Rx.NET had a member called GenerateWithTime that could produce values at a given frequency supplied in the form of a TimeSpan. However that method is no longer available. You can still generate values at a certain frequency. Just replace the line

x => x.Key

with

,x => {Thread.Sleep(TimeSpan.FromMilliseconds(500));return x.Key;})

Now you have an Observable that churns out the next Fibonacci number every half a second. Such techniques can be extremely useful in simulations. For example, suppose you were writing a car-parking simulation and the model you are using assumes that every half-second the number of cars appearing is the same as the next Fibonacci number.

ToObservable

ToObservable converts Enumerables to Observables. This method comes in very handy while trying to write unit tests for Observables.

Here’s a simple example that takes an array of integers and returns the corresponding Observable:

(new int[]{1,2,3}).ToObservable().Dump();

Creating Observables from .NET Events

Most of the time in real-life applications, your Observable collections are generated from .NET events. To convert a .NET event to an event stream, you can use the Observable.FromEventPattern method.

Open a new LINQPad window and add the following code:

Form myForm = new Form();
myForm.Show();
var moves = Observable.FromEventPattern<MouseEventArgs>
(myForm,"MouseMove")
.Select( p => p.EventArgs.Location);

var bisector = moves.Where(p => p.X == p.Y);
bisector.Dump("You are on the bisector at");

Image Note  To run this code, you need to add System.Windows.Forms.dll to the references and then click Additional Namespaces. Click the Pick from Assemblies link, then click System.Windows.Forms.dll and select System.Windows.Forms from the Namespaces list.

Running the code creates a Windows form. When you move your mouse over the form, LINQPad will show the co-ordinates of the mouse pointer whenever it crosses the form’s bisector—a slanted line running from upper left to bottom right.

Figure B-9 shows some typical output.

image

Figure B-9. Points where the mouse crossed the form’s bisector

Again, this is an example best shown through video. I recorded my experiment with LINQPad for this example and posted it at https://www.youtube.com/watch?v=4FTkRF7aYLo. If you’re not running the example, the video will be helpful. Observables are generated, so there is a very real sense of time involved, and the concepts are difficult to grasp solely by reading.

Subscribe

The Subscribe method subscribes to an event. Subscribe requires at least one parameter to print the result generated from the Observable collection. You will find examples of this method used later in this chapter.

Combining Observable Collections

In real-life applications, you often get Observable collections from more than one source and need to combine them to get a single Observable collection. Fortunately, there are several useful extension methods to do just that:

·     Concat

·     Merge

·     Amb

·     Zip

Concat

Concat plugs in two or more Observables, one after the other.

The image in Figure B-10 is a marble diagram showing how Concat works. A marble diagram shows the values from several Observables as colored marbles.

image

Figure B-10. Marble digram for Concat

Paste the following code into LINQPad to see how Concat works:

var range1 = Observable.Range(11,5).Select(x => (long)x);
var inte1 = Observable.Interval(TimeSpan.FromSeconds(.5)).Take(5);

range1.Concat(inte1).Dump();

Running this code produces the output shown in Figure B-11

image

Figure B-11. A Concat result.

Merge

The Merge method combines two Observable collections as depicted in the marble diagram in Figure B-12 The X in the first line represents an exception; in other words, the first Observable collection threw an exception.

image

Figure B-12. Marble diagram for the Merge method

Add the following code to LINQPad in a new tab and change the Language drop-down to C# Program.

void Main()
{
var slow = GetSomeTokens().ToObservable();
var fast = GetSomeTokensFast().ToObservable();
Observable.Merge(slow,fast).Dump();
}
public IEnumerable<string> GetSomeTokensFast()
{
string[] names = {"A","B","C","D","E","F","G"};
for(int i = 0;i<names.Length;i++)
{
Thread.Sleep(new Random().Next(500));
yield return names[i];
}
}

public IEnumerable<string> GetSomeTokens()
{
string[] names = {"Af","fB","fD","fE","fF","fG"};
for(int i = 0;i<names.Length;i++)
{
Thread.Sleep(new Random().Next(1000));
yield return names[i];
}
}

This generates the output you would expect, as shown in Figure B-13.

image

Figure B-13. The result of Merge

Amb

The Amb method lets you pick the Observable that reacts faster and completely ignores the other sources. Amb is short for ambiguous. As shown in Figure B-14, this method can generate a different result each time it is used.

image

Figure B-14. Marble diagram for Amb

To explore how Amb works, just change the method in the preceding example from Merge to Amb. You will get the output shown in Figure B-15.

image

Figure B-15. The output from Amb()

Zip

The Zip() method works the same as it does for Enumerables. The example used for Amb and Merge is just as useful for showing how Zip works. Just change the call in the first line to use Zip rather than Merge or Amb:

Observable.Zip(slow,fast).Dump();

Note that the Zip operation yields a value at each index only after the slowest Observable is done churning out the values.

Zip generates the output shown in Figure B-16.

image

Figure B-16. The result of Zip applied on two Observables

Here is a common scenario in which you might want to use Zip. Let’s say you have two servers that are sending e-book prices. You want to show readers the cheapest price. Typically, you would have to wait until both servers have sent all the data, so it’s quite difficult to do traditionally.

Here’s a simulation of this example. Paste the following code into LINQPad as a new C# statement query:

var ebookPricesOne = Observable.Interval(TimeSpan.FromSeconds(1))
.Select( x => new Random().NextDouble()*10)
.Take(4);
var ebookPricesTwo = Observable.Interval(TimeSpan.FromSeconds(.5))
.Select( x => new Random().NextDouble()*10)
.Take(4);

ebookPricesOne.Dump("First Service");
ebookPricesTwo.Dump("Second Service");

Observable.Zip(ebookPricesOne, ebookPricesTwo)
.Select(x => x.First() <= x.Last ( )? x.First ():x.Last ( ))
.Dump("Cheapest e-book prices");

Running this code will produce a result similar to that shown in Figure B-17. Every time you run it, the code generates a different output, because it works with random numbers. However the overall effect remains same.

image

Figure B-17. The result of Zip() to find out the cheapest e-book price

Partitioning Observables

Streaming data is generally serviced with multiple servers. However, generating data load for all these servers can be difficult. In other words, generating the right amount of data for each buffer for sending it to the assigned server can be difficult. Buffer and Window are the two functions that help make this easy. They are conceptually similar, but Buffer returns an IObservable<IList<T>> while Window returns an IObservable<IObservable<T>>.

Window

Window returns several windows with nonoverlapping values from the source collection:

var times = Observable.Interval(TimeSpan.FromSeconds(2))
.Take(10);
times.Window(2).Dump();

The output of this code is shown in Figure B-18.

image

Figure B-18. The result of Window

As you can see, these sliced/buffered data values can now be passed to several servers for further processing.

Time-Tagging Observables

Sometimes it is important to know the timestamp of data as it arrives in the form of an Observable. Also it is important to know how long it took for the generator sending the Observable to generate each set of values. The methods described in the following sections help to do that.

Timestamp

The Timestamp method adds a timestamp for each generated value:

var slow = Observable.Interval(TimeSpan.FromSeconds(2)).Take(10);
slow
.Timestamp()//Adds a timestamp value for each generated value
.Dump();

This code generates the output shown in Figure B-19.

image

Figure B-19. The values and their associated timestamps

TimeInterval

TimeInterval shows the time between two generated values in an Observable.

The example from the Timestamp section works fine—just change the method call to TimeInterval. The output I got is shown in Figure B-20; your results will be different.

image

Figure B-20. Showing results of TimeInterval

These two methods can be handy. Imagine that you are using Rx.NET for processing orders that are being received from a website and you see a surge of orders coming in a surprisingly fast time interval. That’s a sign of a fraudulent transaction. Maybe your customer’s account is compromised. A technology called anomaly detection, in simple terms, finds the “odd one out.” Using the output from Figure B-20, you can run anomaly detection to find fraudulent transactions or faulty products.

Rx.NET Showcase

Rx can be used for a whole array of problems where the data is streaming. It is being made available on several frameworks; for example, Windows Phone 7.1 has it.

This section contains three examples that I think capture the beauty and power that Rx brings to event-stream processing.

Creating a Signature Capture Screen

This example is a small Windows application for capturing signatures. Here’s the scenario. Courier companies want to get digital signatures from their customers, who—in the real world—use a stylus to write on a screen. This simulation replaces the stylus with the mouse for convenience.

Create a new LINQPad tab and add the following code:

Form sigCapture = new Form();
List<System.Drawing.Point> points = new List<System.Drawing.Point>();
bool draw = false;
sigCapture.Show();
var moves = Observable.FromEventPattern<MouseEventArgs>
(sigCapture,"MouseMove")
.Select(x => x.EventArgs);
var mouseDowns = Observable.FromEventPattern<MouseEventArgs>
(sigCapture,"MouseDown")
.Select(x => x.EventArgs);
var mouseUps = Observable.FromEventPattern<MouseEventArgs>
(sigCapture,"MouseUp")
.Select(x => x.EventArgs);
mouseDowns.Subscribe( x => { draw = true; });
mouseUps.Subscribe( x => { draw = false; });
moves.Subscribe(p =>
{
points.Add(p.Location);
if(points.Count >= 2 && draw)
{
sigCapture.CreateGraphics()
.DrawLine(new System.Drawing.Pen(
System.Drawing.Color.Purple,5.7f),
points[points.Count - 2],
points[points.Count - 1]);
}
});

This code creates a Windows form that you can draw on with the mouse. When you click the form, the flag Draw becomes true, and subsequent mouse movements draw a line. The video at https://www.youtube.com/watch?v=KV4r_gyg424  shows me signing the form. That’s not my official signature! For an exercise, try adding functionality to save the signature and redraw it.

The code is simple. As the user moves the mouse over the form, the event stream generates data; and thus the mouse points get added to the points collection. As soon as there are two or more points in the points collection, the example uses GDI to draw a line between the last two points collected. This example uses GDI to draw a line.

However, you don’t always want to draw a line whenever the user moves the mouse. There has to be a notion of “pen down” or “pen up.” The draw flag fulfills that purpose. On MouseDown, the flag is set to true, and on MouseUp it’s set to false. That way, whenever the users presses the mouse button and moves it, the example draws a line (or rather, multiple lines, one after the other), much like Microsoft Paint.

Live File System Watcher

By watching events on file systems, a system administrator can create a live dashboard that reflects file events happening in that directory.

For example let’s say you want to know whenever a file is created, deleted, changed, or renamed in a given directory. Using Rx, you can do this very easily.

Here is the complete code. Paste this in LINQPad and set the Language drop-down to C# Statements. Provide the path to a folder that you want to monitor.

System.IO.FileSystemWatcher w = new
//Set the folder you want to monitor.
System.IO.FileSystemWatcher("C:\\Apress");

//start the File System Watcher to watch events
w.EnableRaisingEvents = true;
//Find the files that have been created
var fileCreated = Observable
.FromEventPattern<FileSystemEventArgs>(w, "Created")
.Select(z =>
{
var file = new FileInfo(z.EventArgs.FullPath);
return new
{
FullPath = z.EventArgs.FullPath,
Created = z.EventArgs.ChangeType,
Name = z.EventArgs.Name,
DirectoryName = file.DirectoryName
};
});
//Find the files that have been changed.
var fileChanged = Observable
.FromEventPattern<FileSystemEventArgs>(w, "Changed")
.Select(z =>
new {
FullPath = z.EventArgs.FullPath,
ChangeType = z.EventArgs.ChangeType
});

//Find the files that have been renamed
var fileRenamed = Observable
.FromEventPattern<RenamedEventArgs>(w, "Renamed")
.Select(z => new
{
OldFullPath = z.EventArgs.OldFullPath,
NewPath = z.EventArgs.FullPath,
ChangeType = z.EventArgs.ChangeType
});
fileCreated.Dump("Created");
fileRenamed.Dump("Renamed");
fileChanged.Dump("Changed");

When you run this, you will see how the data gets surfaced as you create, rename, or change files in the specified directory.

Summary

Be warned! Rx.NET is not a replacement for the existing .NET eventing system. You should use Rx.NET only when you want to pass around the event arguments and process them in some way using the composable nature of LINQ and related frameworks. I hope you have already started thinking of how Rx.NET can help rewire some of your legacy code.