Interop - Concurrency in C# Cookbook (2014)

Concurrency in C# Cookbook (2014)

Chapter 7. Interop

Asynchronous, parallel, reactive—each has its place, but how well do they work together?

In this chapter, we’ll look at various interop scenarios where we will learn how to combine these different approaches. We’ll learn that they complement each other, rather than compete; there is very little friction at the boundaries where one approach meets another.

7.1. Async Wrappers for “Async” Methods with “Completed” Events

Problem

There is an older asynchronous pattern that uses methods named OperationAsync along with events named OperationCompleted. You wish to perform an operation like this and await the result.

TIP

The OperationAsync and OperationCompleted pattern is called the Event-based Asynchronous Pattern (EAP). We’re going to wrap those into a Task-returning method that follows the Task-based Asynchronous Pattern (TAP).

Solution

You can create wrappers for asynchronous operations by using the TaskCompletionSource<TResult> type. This type controls a Task<TResult> and allows you to complete the task at the appropriate time.

The following example defines an extension method for WebClient that downloads a string. The WebClient type defines DownloadStringAsync and DownloadStringCompleted. Using those, we can define a DownloadStringTaskAsync method as such:

public static Task<string> DownloadStringTaskAsync(this WebClient client,

Uri address)

{

var tcs = new TaskCompletionSource<string>();

// The event handler will complete the task and unregister itself.

DownloadStringCompletedEventHandler handler = null;

handler = (_, e) =>

{

client.DownloadStringCompleted -= handler;

if (e.Cancelled)

tcs.TrySetCanceled();

else if (e.Error != null)

tcs.TrySetException(e.Error);

else

tcs.TrySetResult(e.Result);

};

// Register for the event and *then* start the operation.

client.DownloadStringCompleted += handler;

client.DownloadStringAsync(address);

return tcs.Task;

}

If you’re already using the Nito.AsyncEx NuGet library, wrappers like this are slightly simpler due to the TryCompleteFromEventArgs extension method in that library:

public static Task<string> DownloadStringTaskAsync(this WebClient client,

Uri address)

{

var tcs = new TaskCompletionSource<string>();

// The event handler will complete the task and unregister itself.

DownloadStringCompletedEventHandler handler = null;

handler = (_, e) =>

{

client.DownloadStringCompleted -= handler;

tcs.TryCompleteFromEventArgs(e, () => e.Result);

};

// Register for the event and *then* start the operation.

client.DownloadStringCompleted += handler;

client.DownloadStringAsync(address);

return tcs.Task;

}

Discussion

This particular example isn’t very useful because WebClient already defines a DownloadStringTaskAsync and there is a more async-friendly HttpClient that could also be used. However, this same technique can be used to interface with older asynchronous code that has not yet been updated to use Task.

TIP

For new code, always use HttpClient. Only use WebClient if you’re working with legacy code.

Normally, a TAP method for downloading strings would be named OperationAsync (e.g., DownloadStringAsync); however, that naming convention won’t work in this case because EAP already defines a method with that name. In this case, the convention is to name the TAP methodOperationTaskAsync (e.g., DownloadStringTaskAsync).

When wrapping EAP methods, there is the possibility that the “start” method may throw an exception; in the previous example, DownloadStringAsync may throw. In that case, you’ll need to decide whether to allow the exception to propagate, or catch the exception and callTrySetException. Most of the time, exceptions thrown at that point are usage errors, so it doesn’t matter which option you choose.

See Also

Recipe 7.2 covers TAP wrappers for APM methods (BeginOperation and EndOperation).

Recipe 7.3 covers TAP wrappers for any kind of notification.

7.2. Async Wrappers for “Begin/End” methods

Problem

There is an older asynchronous pattern that uses pairs of methods named BeginOperation and EndOperation, with the IAsyncResult representing the asynchronous operation. You have an operation that follows this pattern and wish to consume it with await.

TIP

The BeginOperation and EndOperation pattern is called the Asynchronous Programming Model (APM). We’re going to wrap those into a Task-returning method that follows the Task-based Asynchronous Pattern (TAP).

Solution

The best approach for wrapping APM is to use one of the FromAsync methods on the TaskFactory type. FromAsync uses TaskCompletionSource<TResult> under the hood, but when you’re wrapping APM, FromAsync is much easier to use.

The following example defines an extension method for WebRequest that sends an HTTP request and gets the response. The WebRequest type defines BeginGetResponse and EndGetResponse; we can define a GetResponseAsync method as such:

public static Task<WebResponse> GetResponseAsync(this WebRequest client)

{

return Task<WebResponse>.Factory.FromAsync(client.BeginGetResponse,

client.EndGetResponse, null);

}

Discussion

FromAsync has a downright confusing number of overloads!

As a general rule, it’s best to call FromAsync like the example. First, pass the BeginOperation method (without calling it) then the EndOperation method (without calling it). Next, pass all arguments that BeginOperation, takes except for the last AsyncCallback and objectarguments. Finally, pass null.

In particular, do not call the BeginOperation method before calling FromAsync. It is possible to call FromAsync, passing the IAsyncOperation that you get from BeginOperation, but if you call it this way, FromAsync is forced to use a less performant implementation.

You might be wondering why the recommended pattern always passes a null at the end. FromAsync was introduced along with the Task type in .NET 4.0, before async was around. At that time, it was common to use state objects in asynchronous callbacks, and the Task type supports this via its AsyncState member. In the new async pattern, state objects are no longer necessary.

See Also

Recipe 7.3 covers writing TAP wrappers for any kind of notification.

7.3. Async Wrappers for Anything

Problem

You have an unusual or nonstandard asynchronous operation or event and wish to consume it via await.

Solution

The TaskCompletionSource<T> type can be used to construct Task<T> objects in any scenario. Using a TaskCompletionSource<T>, you can complete a task in three different ways: with a successful result, faulted, or canceled.

Before async was on the scene, there were two other asynchronous patterns recommended by Microsoft: APM (which we looked at in Recipe 7.2) and EAP (Recipe 7.1). However, both APM and EAP were rather awkward and in some cases difficult to get right. So, an unofficial convention arose that used callbacks, with methods like this:

public interface IMyAsyncHttpService

{

void DownloadString(Uri address, Action<string, Exception> callback);

}

Methods like these follow the convention that DownloadString will start the (asynchronous) download, and when it completes, the callback is invoked with either the result or the exception. Usually, callback is invoked on a background thread.

This nonstandard kind of asynchronous method can also be wrapped using TaskCompletionSource<T> so that it naturally works with await:

public static Task<string> DownloadStringAsync(

this IMyAsyncHttpService httpService, Uri address)

{

var tcs = new TaskCompletionSource<string>();

httpService.DownloadString(address, (result, exception) =>

{

if (exception != null)

tcs.TrySetException(exception);

else

tcs.TrySetResult(result);

});

return tcs.Task;

}

Discussion

This same pattern can be used with TaskCompletionSource<T> to wrap any asynchronous method, no matter how nonstandard. Create the TaskCompletionSource<T> instance first. Next, arrange a callback so that the TaskCompletionSource<T> completes its task appropriately. Then, start the actual asynchronous operation. Finally, return the Task<T> that is attached to that TaskCompletionSource<T>.

One important aspect of this pattern is that you must make sure that the TaskCompletionSource<T> is always completed. Think through your error handling in particular, and ensure that the TaskCompletionSource<T> will be completed appropriately. In the last example, exceptions are explicitly passed into the callback, so we don’t need a catch block; but some nonstandard patterns might need you to catch exceptions in your callbacks and place them on the TaskCompletionSource<T>.

See Also

Recipe 7.1 covers TAP wrappers for EAP members (OperationAsync, OperationCompleted).

Recipe 7.2 covers TAP wrappers for APM members (BeginOperation, EndOperation).

7.4. Async Wrappers for Parallel Code

Problem

You have (CPU-bound) parallel processing that you wish to consume using await. Usually, this is desirable so that your UI thread does not block waiting for the parallel processing to complete.

Solution

The Parallel type and Parallel LINQ use the thread pool to do parallel processing. They will also include the calling thread as one of the parallel processing threads, so if you call a parallel method from the UI thread, the UI will be unresponsive until the processing completes.

To keep the UI responsive, wrap the parallel processing in a Task.Run and await the result:

await Task.Run(() => Parallel.ForEach(...));

The key behind this recipe is that parallel code includes the calling thread in its pool of threads that it uses to do the parallel processing. This is true for both Parallel LINQ and the Parallel class.

Discussion

This is a simple recipe but one that is often overlooked. By using Task.Run, you are pushing all of the parallel processing off to the thread pool. Task.Run returns a Task that then represents that parallel work, and the UI thread can (asynchronously) wait for it to complete.

This recipe only applies to UI code. On the server side (e.g., ASP.NET), parallel processing is rarely done. Even if you do perform parallel processing, you should invoke it directly, not push it off to the thread pool.

See Also

Chapter 3 covers the basics of parallel code.

Chapter 2 covers the basics of asynchronous code.

7.5. Async Wrappers for Rx Observables

Problem

You have an observable stream that you wish to consume using await.

Solution

First, you need to decide which of the observable events in the event stream you’re interested in. The common situations are:

§ The last event before the stream ends

§ The next event

§ All the events

To capture the last event in the stream, you can either await the result of LastAsync or just await the observable directly:

IObservable<int> observable = ...;

int lastElement = await observable.LastAsync();

// or: int lastElement = await observable;

When you await an observable or LastAsync, the code (asynchronously) waits until the stream completes and then returns the last element. Under the covers, the await is subscribing to the stream.

To capture the next event in the stream, use FirstAsync. In this case, the await subscribes to the stream and then completes (and unsubscribes) as soon as the first event arrives:

IObservable<int> observable = ...;

int nextElement = await observable.FirstAsync();

To capture all events in the stream, you can use ToList:

IObservable<int> observable = ...;

IList<int> allElements = await observable.ToList();

Discussion

The Rx library provides all the tools you need to consume streams using await. The only tricky part is that you have to think about whether the awaitable will wait until the stream completes. Of the examples in this recipe, LastAsync, ToList, and the direct await will wait until the stream completes; FirstAsync will only wait for the next event.

If these examples don’t satisfy your needs, remember that you have the full power of LINQ as well as the new Rx manipulators. Operators such as Take and Buffer can also help you asynchronously wait for the elements you need without having to wait for the entire stream to complete.

Some of the operators for use with await—such as FirstAsync and LastAsync—do not actually return a Task<T>. If you plan to use Task.WhenAll or Task.WhenAny, then you’ll need an actual Task<T>, which you can get by calling ToTask on any observable. ToTask will return aTask<T> that completes with the last value in the stream.

See Also

Recipe 7.6 covers using asynchronous code within an observable stream.

Recipe 7.7 covers using observable streams as an input to a dataflow block (which can perform asynchronous work).

Recipe 5.3 covers windows and buffering for observable streams.

7.6. Rx Observable Wrappers for async Code

Problem

You have an asynchronous operation that you want to combine with an observable.

Solution

Any asynchronous operation can be treated as an observable stream that either:

§ Produces a single element and then completes

§ Faults without producing any elements

The Rx library includes a simple conversion from Task<T> to IObservable<T> that implements this transformation. The following code starts an asynchronous download of a web page, treating it as an observable sequence:

var client = new HttpClient();

IObservable<HttpResponseMessage> response =

client.GetAsync("http://www.example.com/")

.ToObservable();

The ToObservable approach assumes you have already called the async method and have a Task to convert.

Another approach is to call StartAsync. StartAsync also calls the async method immediately but supports cancellation: if a subscription is disposed of, the async method is canceled:

var client = new HttpClient();

IObservable<HttpResponseMessage> response = Observable

.StartAsync(token => client.GetAsync("http://www.example.com/", token));

Both ToObservable and StartAsync immediately start the asynchronous operation without waiting for a subscription. If you want to create an observable that only starts the operation when subscribed to, you can use FromAsync (which also supports cancellation just like StartAsync):

var client = new HttpClient();

IObservable<HttpResponseMessage> response = Observable

.FromAsync(token => client.GetAsync("http://www.example.com/", token));

FromAsync is notably different than ToObservable and StartAsync. Both ToObservable and StartAsync return an observable for an async operation that has already started. FromAsync starts a new, independent async operation every time it is subscribed to.

Finally, you can use special overloads of SelectMany to start asynchronous operations for each event in a source stream as they arrive. SelectMany also supports cancellation.

The following example takes an existing event stream of URLs and then initiates a request as each URL arrives:

IObservable<string> urls = ...

var client = new HttpClient();

IObservable<HttpResponseMessage> responses = urls

.SelectMany((url, token) => client.GetAsync(url, token));

Discussion

Reactive Extensions existed before the introduction of async but added these operators (and others) so that it could interoperate well with async code. I recommend that you use the operators described even though you can build the same functionality using other Rx operators.

See Also

Recipe 7.5 covers consuming observable streams with asynchronous code.

Recipe 7.7 covers using dataflow blocks (which can contain asynchronous code) as sources of observable streams.

7.7. Rx Observables and Dataflow Meshes

Problem

Part of your solution uses Rx observables, and part of your solution uses dataflow meshes, and you need them to communicate.

Rx observables and dataflow meshes each have their own uses, with some conceptual overlap; this recipe shows how easily they work together so you can use the best tool for each part of the job.

Solution

First, let’s consider using a dataflow block as an input to an observable stream. The following code creates a buffer block (which does no processing) and creates an observable interface from that block by calling AsObservable:

var buffer = new BufferBlock<int>();

IObservable<int> integers = buffer.AsObservable();

integers.Subscribe(data => Trace.WriteLine(data),

ex => Trace.WriteLine(ex),

() => Trace.WriteLine("Done"));

buffer.Post(13);

Buffer blocks and observable streams can be completed normally or with error, and the AsObservable method will translate the block completion (or fault) into the completion of the observable stream. However, if the block faults with an exception, that exception will be wrapped in anAggregateException when it is passed to the observable stream. This is similar to how linked blocks propagate their faults.

It is only a little more complicated to take a mesh and treat it as a destination for an observable stream. The following code calls AsObserver to allow a block to subscribe to an observable stream:

IObservable<DateTimeOffset> ticks =

Observable.Interval(TimeSpan.FromSeconds(1))

.Timestamp()

.Select(x => x.Timestamp)

.Take(5);

var display = new ActionBlock<DateTimeOffset>(x => Trace.WriteLine(x));

ticks.Subscribe(display.AsObserver());

try

{

display.Completion.Wait();

Trace.WriteLine("Done.");

}

catch (Exception ex)

{

Trace.WriteLine(ex);

}

Just as before, the completion of the observable stream is translated to the completion of the block, and any errors from the observable stream are translated to a fault of the block.

Discussion

Dataflow blocks and observable streams share a lot of conceptual ground. They both have data pass through them, and they both understand completion and faults. They were designed for different scenarios; TPL Dataflow is intended for a mixture of asynchronous and parallel programming, while Rx is intended for reactive programming. However, the conceptual overlap is compatible enough that they work very well and naturally together.

See Also

Recipe 7.5 covers consuming observable streams with asynchronous code.

Recipe 7.6 covers using asynchronous code within an observable stream.