Applied .NET Task Parallel Library - Real World .NET, C#, and Silverlight: Indispensible Experiences from 15 MVPs (2012)

Real World .NET, C#, and Silverlight: Indispensible Experiences from 15 MVPs (2012)

Chapter 11

Applied .NET Task Parallel Library

by Jeffrey Juday

It has become a multicore world, and parallel programming must be embraced for a developer to capitalize on all the burgeoning multicore potential. Embracing parallel programming in .NET means a developer must learn about the Task Parallel Library (TPL).

Although .NET has always had features often associated with concurrency and parallel algorithms such as threading and mutexes, the goal of the aTPL is to make concurrency and parallel construction even easier. Borrowing from academia, commercial best practices, and incorporating some unique innovations, the TPL has been assembled to simplify .NET parallel programming. The TPL introduces some new constructs and sports an improved ThreadPooling mechanism.

Parallel programming is often divided into data parallelism and task parallelism. Data parallelism in .NET is encapsulated into parallel loops and parallel LINQ. Underpinning parallel loops and parallel LINQ are all the TPL task parallelism classes. Although a developer could leverage parallel loops and parallel LINQ without delving into all TPL classes, diverging from some simple scenarios requires some deeper understanding. A deeper data parallelism understanding actually entails learning how to work with a core set of TPL classes.

That same reasoning goes for learning task parallelism. A developer can leverage task parallelism with a few TPL classes. Again, though, diverging from well-defined scenarios requires understanding some core TPL classes.

By demonstrating class usage and common operations, this chapter introduces the core TPL classes.

Like all new .NET features, the TPL has a learning curve. An introduction using the core classes to solve a common problem can flatten the TPL learning curve. Developers reading the discussions and studying the sample code presented here will learn about the following concepts:

· Task class

· Concurrent collections

· TPL approaches to exception handling

· Approaches to canceling parallel operations

· Implementing the actor/agent design pattern

· Continuations

· Configuring a parallel workload

Problems and Solutions

A good way to learn how to apply new components is to solve a common problem. Often, it helps to start with a common solution to a common problem, and then move to the new approaches to the problem. Recursively walking a directory hierarchy and totaling all the bytes occupied by a particular file type is a common problem most developers are already familiar with. Code based on a recursive algorithm is a common solution to the problem.

Typically, the algorithm walks a tree resembling the shape shown in Figure 11.1.

Figure 11.1 Typical directory structure

11.1

Recursing a directory tree takes a divide-and-conquer approach. It starts at the root, chooses a path, walks down each branch, and reverses, doing the same thing down another path. The following snippet shows recursive code serially traversing a directory tree and totaling the number of bytes occupied by .docx files:

note

public void Run()

{

var totalBytes = Visit(_root);

Console.WriteLine("Total bytes == " + totalBytes );

}

private long Visit(string subDir)

{

long totalBytes = 0;

try

{

var dir = new DirectoryInfo(subDir);

var children = dir.EnumerateDirectories().Select(info =>

{ return info.FullName; });

long bytes = 0;

bytes = dir.EnumerateFiles("*." + _extension).Sum(f =>

{ return f.Length; });

totalBytes = totalBytes + bytes;

foreach (var directory in children)

{

totalBytes = totalBytes + Visit(directory);

}

}

catch (Exception ex)

{

Console.WriteLine("EXCEPTION: " + ex.Message +

" skipping to next folder");

}

return totalBytes;

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

The DirectoryInfo class includes methods and properties from the directory associated with the path supplied in the DirectoryInfo constructor. EnumerateDirectories returns an IEnumerable of child DirectoryInfo classes. EnumerateFiles returns a collection of FileInfo classes contained in the directory. A filter parameter returns only the FileInfo classes with the particular extension (.docx). Sum is a LINQ aggregate function that operates on any numeric property or function. Notice the tail recursion call to Visit. The whole process terminates when the root Visit completes.

The solution works fine. However, the solution is slow, and doesn't utilize all the resources of a modern personal computer. The solution runs on a single thread, and will occupy only one processor. Modern systems have more than one processor. An ideal solution would spread the work across all available processors.

So, now you understand this common problem. The remainder of this chapter solves the problem applying two different TPL solutions. Each approach to the problem introduces a new set of core TPL classes. One example, called TPLParentChild, closely mimics the recursive execution in the directory recursion example just discussed. A second example, called TPLContinuation, exhibits a completely different style altogether.

Each example is a conceptual leap from the original recursive example presented earlier. It's difficult to see a need for a data structure without seeing a legitimate need for the component. So, all solutions are fully working applications. Emphasis is placed on how to apply the TPL, rather than simply an explanation of the various classes.

Although totaling the bytes occupied by .docx files may seem trivial, the patterns and approaches to the problem mimic common patterns and approaches to other TPL solutions. TPL solutions often have a recursive nature to them by applying a similar divide-and-conquer approach that the following solutions describe.

Often, a TPL solution performs an operation on a tree or hierarchical data structure like, for example, a Directed Acyclic Graph (DAG). A directory structure is a data structure that most developers are familiar with. A directory structure also happens to be based on a DAG. Each directory is a vertex, and edges are the parent-to-subdirectory relationship. In other words, these approaches and patterns apply to real-world problems.

Using Tasks

To leverage the TPL, developers must partition the parallel portions of an application into tasks. Tasks are a sort of sheath for a class method, lambda expression, Func<T>, Action, or the result of an operation. Tasks represent a workload, and the result of the executed workload.

To envision a need for a task, consider what happens when a method executes. Internally, the method may declare variables and do some computation. A method may or may not return a value. It may also generate an exception.

Now, consider what would be important to know if a developer wants to ship off a method to be executed in some other part of the application. Aside from the result (exception or value), a developer may want to know the method's current status. Has the method started running? Is the method waiting to run? What if a developer wants to wait on the completion of a method? What if a developer has more than one method? How can a developer wait on the result of more than one method?

All this functionality is encapsulated in the Task class.

Task Class

Tasks come in two flavors: a Task class and a generic Task<TResult> class. A developer can find the Task and generic Task<TResult> class in the System.Threading.Tasks namespace. Following are some of the methods and properties of the Task and Task<TResult> classes:

public class Task : IThreadPoolWorkItem, IAsyncResult, IDisposable

{

public Task(Action action);

public Task(Action<object> action, object state);

public Task(Action action, CancellationToken cancellationToken);

public Task(Action action, TaskCreationOptions creationOptions);

public Task(Action<object> action, object state, CancellationToken

cancellationToken, TaskCreationOptions creationOptions);

public bool IsCanceled { get; }

public bool IsCompleted { get; }

public bool IsFaulted { get; }

public TaskStatus Status { get; }

public Task ContinueWith(Action<Task> continuationAction);

public void RunSynchronously();

public void Start();

public void Wait();

public static void WaitAll(params Task[] tasks);

public static int WaitAny(params Task[] tasks);

}

public class Task<TResult> : Task

{

public Task(Func<TResult> function);

public TResult Result { get; internal set; }

public Task ContinueWith(Action<Task<TResult>> continuationAction);

}

These methods and properties comprise basic task functionality. There are additional task methods that will be dealt with later in this chapter. Task<TResult> inherits from Task. An astute developer may notice that Task implements IAsyncResult. The .NET Asynchronous Programming Model (APM) usesIAsyncResult to coordinate an asynchronous invocation. Though a full discussion of APM is beyond the scope of this chapter, you need to realize that tasks have usefulness that go beyond task parallelism.

Tasks require an Action. Task<TResult> (a generic Task) includes a Result property and, therefore, Task<TResult> requires a Func<T>. A Func<T> and Action delegate can be created from a class method or lambda expression.

Following are code examples of Task and Task<TResult> classes constructed with variations matching Func<T> and Action:

note

Task task = new Task(() =>

{

Console.WriteLine("The Task Ran"); }

);

Task<string> taskWithResult = new Task<string>(() =>

{

return "This task has a result";

}

);

var f = new Func<string>(() =>

{

return "In a func";

}

);

Task<string> taskWithFunc = new Task<string>(f);

Task taskWithClassMethod = new Task(Program.SampleMethod);

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Almost everything in the TPL revolves around Tasks and Task<TResult>. Task and Task<TResult> are referred to as tasks for the remainder of the chapter. After a task is created, it must be started for the TPL to schedule the task to run. Typically, a developer starts a task and, somewhere in the code, waits for the task to complete. Following are some variations on starting and waiting:

note

task.Start();

task.Wait();

taskWithResult.Start();

taskWithResult.Wait();

Console.WriteLine(taskWithResult.Result);

taskWithFunc.Start();

taskWithFunc.Wait();

Console.WriteLine(taskWithFunc.Result);

taskWithClassMethod.Start();

taskWithClassMethod.Wait();

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Later in this chapter, you learn more about variations on waiting and actions that a developer can take in response to a completed task.

A separate class called a TaskScheduler handles and executes tasks. Tasks may be executed on the thread pool, or from within the TaskScheduler. Figure 11.2 shows a task's execution path.

Figure 11.2 Executing a task (with dotted lines depicting the path of a task)

11.2

A task can specify a custom TaskScheduler (if one is available), or utilize the default TaskScheduler. The default TaskScheduler maintains a pool of threads and works with the ThreadPool.

Depending on TaskCreationOptions and other conditions, a task can be queued to run a number of ways. Idle threads can search for work in the global ThreadPool queue, or steal work from another thread's local queue.

Closures

Lambda expressions are a common way to compose tasks. Lambda expressions are often used to implement a Closure. The following code demonstrates a Closure:

note

static Action RunClosure(int whichOneToRun)

{

var messagePrefix = "You ran ";

Action act = null;

switch (whichOneToRun)

{

case 1:

act = new Action(() =>

{

var msg = messagePrefix + "1";

Console.WriteLine(msg);

}

);

break;

case 2:

act = new Action(() =>

{

var msg = messagePrefix + "2";

Console.WriteLine(msg);

}

);

break;

}

return act;

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Notice how the local variable messagePrefix is utilized by the Action. Although a Closure and lambda expression are not necessary for building tasks, in a simple situation such as the one shown here, a Closure eliminates the need to create a separate private class and use one of the class instance methods.

Although a Closure can make code more readable, care must be exercised with Closures. In the following example, when the Action is invoked, the loop has exited, and the looping variable is not the expected value:

note

static Action RunClosureBad()

{

Action act = null;

for (int n = 0; n <= 5; ++n)

{

if (n == 2)

{

act = new Action(() =>

{

Console.WriteLine("This should be a 2 but it is

a " + n.ToString());

Console.WriteLine("This should say Some value

is here but it says " + obj.MyVal);

}

);

}

}

return act;

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Because the outer method continues to execute after the Action variable is assigned, the n retains the last assignment value. In this situation, the value becomes the value that terminates the loop.

A better approach is to restructure the code so that it looks similar to the following sample:

note

Action act = null;

for (int n = 0; n <= 5; ++n)

{

if (n == 2)

{

var nMoreLocal = n;

act = new Action(() =>

{

Console.WriteLine("This should be a 2 and it is a "

+ nMoreLocal.ToString());

}

);

}

}

return act;

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Notice how the nMoreLocal variable “caches” the value for the expression to use when it executes.

When implementing a Closure, developers must also beware of using statements, which invoke the Dispose method after a variable breaks the confines of the using block. Executing beyond the block and generating an exception are two ways code leaves a using block.

So, in the following example, although the obj variable is still technically “in scope,” it is essentially useless because its Dispose method has been called before the variable is used:

note

class SampleDisposed : IDisposable

{

public string MyVal { get; set; }

public SampleDisposed(string myVal) { this.MyVal = myVal; }

public void Dispose()

{

this.MyVal = "Dispose has been called";

}

}

static Action RunClosureBad()

{

Action act = null;

using (var obj = new SampleDisposed("Some value is here"))

{

for (int n = 0; n <= 5; ++n)

{

if (n == 2)

{

act = new Action(() =>

{

Console.WriteLine("This should say Some value

is here but it says " + obj.MyVal);

}

);

}

}

}

return act;

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

This example is trivial, but had the using statement been for one of the many more complex classes in the .NET Framework that implement IDisposable, the expression would have undoubtedly generated an exception.

The following code fixes the using statement issues:

note

Action act = null;

for (int n = 0; n <= 5; ++n)

{

if (n == 2)

{

var nMoreLocal = n;

act = new Action(() =>

{

using (var obj = new

SampleDisposed("Some value is here"))

{

Console.WriteLine("This should be a 2 and it is a "

+ nMoreLocal.ToString());

Console.WriteLine("This should say Some value is

here and it does say " + obj.MyVal);

}

}

);

}

}

return act;

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

The using statement was moved inside of the lambda expression and the result better aligns with the developer's intent.

Applying Tasks

Putting tasks to work starts with thinking about a task's workload. Tasks can be short bursts of work, or can execute for the lifetime of a process. As discussed earlier, a separate TPL class called TaskScheduler handles scheduling and executing a task on a thread pool. A developer injects TaskScheduler“hints” when creating a task. Taking the hints into account, along with the resources available on the machine, TaskScheduler adjusts the thread pool.

A default TaskScheduler is included in the TPL. Building a custom TaskScheduler is beyond the scope of this chapter. Following was the original recursive example from earlier in this chapter:

note

public void Run()

{

var totalBytes = Visit(_root);

Console.WriteLine("Total bytes == " + totalBytes );

}

private long Visit(string subDir)

{

long totalBytes = 0;

try

{

var dir = new DirectoryInfo(subDir);

var children = dir.EnumerateDirectories().Select(info => {

return info.FullName; });

long bytes = 0;

bytes = dir.EnumerateFiles("*." + _extension).Sum(f => { return

f.Length; });

totalBytes = totalBytes + bytes;

foreach (var directory in children)

{

totalBytes = totalBytes + Visit(directory);

}

}

catch (Exception ex)

{

Console.WriteLine("EXCEPTION: " + ex.Message + " skipping to

next folder");

}

return totalBytes;

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Following is part of a parallel programming solution to the recursive computation discussed earlier in this chapter:

note

public void Run()

{

StartGettingInput();

var totalBytes = Visit(_root,TaskCreationOptions.LongRunning);

//Root is long running

Console.WriteLine("Total bytes == " + totalBytes );

}

private long Visit(string subDir, TaskCreationOptions opts)

{

CancellationTokenSource cancelLocal =

new CancellationTokenSource();

var cancelJoinedToken =

CancellationTokenSource.CreateLinkedTokenSource

(_cancelGlobal.Token, cancelLocal.Token).Token;

var task = new Task<long> ( new Func<long> (() =>

{

long bytes = 0;

long bytesChildren = 0;

cancelJoinedToken.ThrowIfCancellationRequested();

var dir = new DirectoryInfo(subDir);

var children = dir.EnumerateDirectories().Select(info => {

return info.FullName; });

bytes = dir.EnumerateFiles("*." + _extension).Sum(f => {

return f.Length; });

List<Task<long>> childTasks = new List<Task<long>>();

//Run children

foreach (var directory in children)

{

var localDir = directory;

var tNew = Task.Factory.StartNew<long>(obj =>

{

return Visit(localDir,

TaskCreationOptions.AttachedToParent);

}

, cancelJoinedToken,

TaskCreationOptions.AttachedToParent);

childTasks.Add(tNew);

}

//Get the child results

if (childTasks.Count > 0)

{

Task.WaitAll(childTasks.ToArray());

//If you get a cancel or exception the line below is

//never executed.

bytesChildren = childTasks.Sum(t => { return t.Result; });

}

else { bytesChildren = 0; }

return bytes + bytesChildren;

}

)

, cancelJoinedToken, opts);

return this.RunVisit(task,cancelLocal);

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Some of this code should look familiar. The computation is the same, and the directory query is the same. The new parts are the task creation, task waiting, and a concept called cancellations that is examined later in this chapter. StartNew (a method on the default TaskScheduler) creates and immediately starts a task.

Figure 11.3 shows what conceptually happens when the code runs.

Figure 11.3 Running TPLParent sample

11.3

The code walks the directory tree just like the original recursive example. The main difference is that, instead of moving down one path, backing upward, and then down another path, the code simultaneously fans out down all paths. Each visit to child directories blocks a parent task until all child tasks return a result. Consider what the number of blocked parent tasks means to the TPL when it must schedule hundreds of tasks to run on a much smaller pool of threads. How does the TPL decide what to run first? The TPL is not aware of the underlying nature of the directory tree.

This is where the task creation “hints” come into play. The following code snippet shows some of the TaskCreationOptions (hints) available to a developer.

public enum TaskCreationOptions

{

None = 0,

PreferFairness = 1,

LongRunning = 2,

AttachedToParent = 4,

}

TaskCreationOptions provide execution guidelines to the TPL. Being guidelines, the options are not guaranteed. Rather, options should be considered TPL suggestions. Other TPL execution semantics may override the options. You learn more about why the TPL may want to override the guidelines later in this chapter.

In the sample, AttachedToParent suggests to the TPL that the result of the parent task is based on the result of the created task. Creating the linkage means that the TPL could execute a child task on the same thread as the parent task.

So, for example, child tasks and the child tasks of a child task may all execute on the same thread. Because the solution may create hundreds of tasks, running a child on the same thread eliminates the need to occupy multiple blocking threads all waiting on their child tasks to complete. The TPL documentation refers to this behavior as inlining a task.

As stated earlier, a parent task blocks until its children complete. In the sample, childTasks stores the children. WaitAll does just what its name would have you believe. It blocks on the WaitAll call until all of the tasks complete. After all tasks in the childTasks variable each execute the function that returns the total number of bytes occupied by files with the .docx extension, the value can be retrieved from the Result property.

The root parent task is created with the LongRunning TaskCreationOption. This hint indicates to the TPL that this task may need its own thread. Because values are totaled at each child, the total of the root's children is the total for all subdirectories. When children of the parent root complete, execution is complete.

Understanding TPL-Style Exception Handling

Although it's important to specify a task relationship for performance reasons, there are other reasons beyond performance that specifying a parent-child relationship can help — in particular, exception handling.

Understanding AggregateException

Parallel programming exception handling addresses unique challenges. Unlike a serially executing application where only one method may be executing at a time, a parallel application may have hundreds of simultaneously executing methods. That means that a parallel application could simultaneously generate hundreds of exceptions.

AggregateException is the TPL's answer to packaging all those exceptions. Some TPL methods act like collection points for Try/Catch blocks. For example, The Wait methods utilized earlier exhibit exception-collecting behavior.

Now take a quick look at AggregateException. Following are some methods of AggregateException class:

public class AggregateException : Exception

{

public AggregateException(string message);

public AggregateException(string message,

params Exception[]innerExceptions);

public AggregateException Flatten();

public void Handle(Func<Exception, bool> predicate);

}

Like all exceptions, AggregateException inherits from the base Exception class. Also, AggregateException may include InnerExceptions and, therefore, stacks of exceptions. The Flatten and Handle methods are what sets AggregateExceptions apart. Flatten recurses through the InnerExceptions, finding all the otherAggregateExceptions, and builds a new AggregateException composed of just the non-AggregateExceptions. Handle invokes the Func<Exception, bool> passed to it on each of the Exceptions contained in the class.

Like any exception, unhandled TPL exceptions can crash an application. The TPL imposes some lighter restrictions on how it interprets an unhandled exception.

TPL documentation uses the term observed exception. Observed exceptions are considered handled. When the Exception property of the task generating the exception is examined, the exception is considered “observed” and, therefore, handled. An exception is also considered “observed” when theHandle method on an AggregateException labels the exception as handled. (You learn more about this later in this chapter.)

Implementing Exception Handling

Following is some exception-handling code from the TPLParentChild sample:

note

private long RunVisit(Task<long> task, CancellationTokenSource cancelLocal)

{

try

{

task.Start();

this.ExecuteOnInput(cancelLocal);

task.Wait();

return task.Result;

}

catch (AggregateException agExp)

{

agExp.Flatten().Handle(this.HandleException);

return 0;

}

}

private bool HandleException(Exception ex)

{

if (ex is TaskCanceledException)

{ Console.WriteLine("Task was cancelled"); }

else

{

Console.WriteLine("EXCEPTION: " + ex.Message);

}

return true;

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

The TPL channels an executing task's exceptions back to a point in code by performing a Wait or WaitAll for a task or collection of tasks. Task.WaitAny does not exhibit the same behavior as Wait or WaitAll. Unlike Wait or WaitAll; WaitAny doesn't funnel exceptions to the Catch portion of the Try/Catch block.Try/Catch can be employed just like other .NET exception handling.

Flatten may at first seem odd. However, as alluded to earlier, specifying TaskCreationOptions.AttachedToParent tells the TPL that exceptions can be aggregated up to the parent task. Thinking about the directory .docx totaling sample, this means that AggregateExceptions with InnerExceptions of AggregateExceptions way down scores of layers deep could be created. Had the TPL aggregated each AggregateException into a new AggregateException, this would have meant building an AggregateException, whether it was important to examine the exception.

As seen in the previous example, Handle invokes a delegate on each InnerException. If the invoked delegate returns true, the exception is considered “observed” and, therefore, handled.

The TPL also includes a “last ditch” exception-handling event on the TaskScheduler. Any unobserved exception can generate a call to the event. The following code demonstrates how to hook this event:

TaskScheduler.UnobservedTaskException +=

new EventHandler<UnobservedTaskExceptionEventArgs>

(Program.UnobservedHandle);

Unobserved exceptions can crash an application. Although the TPL exception handling lowers the bar a bit, exceptions should still not be ignored.

Understanding Cancellations

Stopping a parallel workload often means scores of tasks each in different stages of execution must somehow halt what is being done without destabilizing an application. Developers familiar with threads may notice the absence of an Abort equivalent on the Task class. Although the TPL can schedule a running task and decide whether it's safe to run a particular task before another task, the TPL is ignorant of the nature of the code it runs. Running code could entail modifying sensitive data structures, or even making physical changes to data on the hard drive.

Parallel programming requires some tools for making an “orderly exit,” as well as some way to distinguish between a “local” exit (where, for example, a single task must be canceled) and a “global” exit (where the entire process is shutting down, and must “call-in” all running tasks).

Applying Cancellations — Basics

You can use the earlier TPLParentChild sample again to demonstrate a common way to do a cancellation. Here is the TPLParentChild sample code again:

note

public void Run()

{

StartGettingInput();

var totalBytes = Visit(_root,TaskCreationOptions.LongRunning);

//Root is long running

Console.WriteLine("Total bytes == " + totalBytes );

}

private long Visit(string subDir, TaskCreationOptions opts)

{

CancellationTokenSource cancelLocal = new CancellationTokenSource();

var cancelJoinedToken =

CancellationTokenSource.CreateLinkedTokenSource

(_cancelGlobal.Token, cancelLocal.Token).Token;

var task = new Task<long> ( new Func<long> (() =>

{

long bytes = 0;

long bytesChildren = 0;

cancelJoinedToken.ThrowIfCancellationRequested();

var dir = new DirectoryInfo(subDir);

var children = dir.EnumerateDirectories().Select(info => {

return info.FullName; });

bytes = dir.EnumerateFiles("*." + _extension).Sum(f => {

return f.Length; });

List<Task<long>> childTasks = new List<Task<long>>();

//Run children

foreach (var directory in children)

{

var localDir = directory;

var tNew = Task.Factory.StartNew<long>(obj =>

{

return Visit(localDir,

TaskCreationOptions.AttachedToParent);

//

} , cancelJoinedToken,

TaskCreationOptions.AttachedToParent);

childTasks.Add(tNew);

}

//Get the child results

if (childTasks.Count > 0)

{

Task.WaitAll(childTasks.ToArray());

//If you get a cancel or exception the line below is

//never executed.

bytesChildren = childTasks.Sum(t =>

{ return t.Result; });

}

else { bytesChildren = 0; }

return bytes + bytesChildren;

}

)

, cancelJoinedToken, opts);

return this.RunVisit(task,cancelLocal);

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Cancellations involve two classes: CancellationTokenSource and CancellationToken. Implementing a cancellation requires code that periodically checks the status on a CancellationToken.

Cancellations are handled through the exception infrastructure. Developers either throw an OperationCancelledException, or call the ThrowIfCancellationRequested method, which conditionally throws the OperationCancelledException when a cancellation is signaled. Calling Cancel on the CancellationTokenSourcechanges the CancellationToken status to canceled.

Like any other exception, a TaskCancelException bubbles up through an application until it is handled. Applications must catch a TaskCancelException to acknowledge the cancellation. Tasks have a canceled state if they are created with a CancellationToken parameter. When a cancellation is signaled, tasks associated with a canceled CancellationToken changes state to canceled. Task cancellation state can also be explicitly set using another class called TaskCompletionSource.

If the same behavior is executed for “local” as well as a “global” cancellation, tokens can be joined so only a single token can be monitored, or a single CancellationTokenSource can be invoked. The earlier sample demonstrated this behavior. A static function called CreateLinkedTokenSource on theCancellationTokenSource class accepts multiple Tokens and links the Tokens to a CancellationTokenSource class.

Various task-creation and Wait methods accept a CancellationToken. Following are some examples:

public Task(Action action, CancellationToken cancellationToken);

public Task ContinueWith(Action<Task> continuationAction, CancellationToken

cancellationToken);

public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken);

public static void WaitAll(Task[] tasks, CancellationToken cancellationToken);

public static int WaitAny(Task[] tasks, CancellationToken cancellationToken);

Exercise care when including a CancellationToken parameter, especially for the Wait methods. For example, the WaitAll call from the ParentChild sample earlier in the chapter could have just as easily included a CancellationToken parameter, rather than checking for a CancellationToken before executing. Here is the code again:

note

var task = new Task<long> ( new Func<long> (() =>

{

long bytes = 0;

long bytesChildren = 0;

cancelJoinedToken.ThrowIfCancellationRequested();

var dir = new DirectoryInfo(subDir);

var children = dir.EnumerateDirectories().Select(info => {

return info.FullName; });

bytes = dir.EnumerateFiles("*." + _extension).Sum(f => {

return f.Length; });

List<Task<long>> childTasks = new List<Task<long>>();

//Run children

foreach (var directory in children)

{

var localDir = directory;

var tNew = Task.Factory.StartNew<long>(obj =>

{

return Visit(localDir,

TaskCreationOptions.AttachedToParent);

}

, cancelJoinedToken,

TaskCreationOptions.AttachedToParent);

childTasks.Add(tNew);

}

//Get the child results

if (childTasks.Count > 0)

{

Task.WaitAll(childTasks.ToArray());

//If you get a cancel or exception the line below is

//never executed.

bytesChildren = childTasks.Sum(t =>

{ return t.Result; });

}

else { bytesChildren = 0; }

return bytes + bytesChildren;

}

)

, cancelJoinedToken, opts);

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Implementing WaitAll this way could have nullified the effect of the AttachedToParent TaskCreationOption, and negatively impacted performance. Monitoring creates overhead and can change the TPL's behavior. Passing the CancellationToken to a method instructs the TPL to monitor the cancellation. The TPL would have had to monitor the CancellationToken and, therefore, may not have opted to run the child task within the thread of the parent task.

As stated earlier, the TPL doesn't like to abort a thread. To monitor a cancellation on WaitAll, the TPL must periodically check for cancellation. Inlining attaches an additional task to the end of the completed parent task. On a solution like TPLParentChild, there may be hundreds of tasks. So, when doing something such as monitoring a cancellation, often it can favor approaches that minimize the long-running tasks and opt for standard task scheduling so that the TPL can monitor between executing tasks.

Applying Cancellations — Register Action, Interlocked

Cancellations can be configured to run code. A delegate, lambda, Action, or method can be registered to execute when a cancellation is activated. Following is code demonstrating the Register method:

note

int getInput = 0;

_collector.Cancellation.Register(() =>

{

Interlocked.Increment(ref getInput);

}

);

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Developers can call Register on the same token multiple times, thus registering multiple Actions. The registered Action is executed synchronously, so the recommendation is that a developer do something fast and simple, as shown in the previous example. Register is helpful when, for example, a developer wants to hide the CancellationToken from another part of the application. Instead of supplying a place to receive a CancellationToken, only the Action is required.

Other parts of the application modify the getInput variable. For example, in the sample code, user input-handling code may also transition the variable. Notice how the sample utilizes the Interlocked class rather than locking the variable, or declaring a volatile variable.

Interlocked is a more performant alternative to a lock, and a safer alternative to using a volatile variable. Like a lock, the Interlocked class ensures that only one thread modifies the value, only Interlocked is more efficient than a full lock. Unlike a volatile variable, Interlocked performs the read, change, and write-back all in one step. Although the previous example uses a primitive (int), Interlocked includes methods to handle reference types.

Following are some of the methods for the Interlocked class:

public static class Interlocked

{

public static int Add(ref int location1, int value);

public static long Add(ref long location1, long value);

public static double CompareExchange(ref double location1,

double value, double comparand);

public static float CompareExchange(ref float location1, float value,

float comparand);

public static int CompareExchange(ref int location1, int value,

int comparand);

public static IntPtr CompareExchange(ref IntPtr location1,

IntPtr value, IntPtr comparand);

public static long CompareExchange(ref long location1, long value,

long comparand);

public static object CompareExchange(ref object location1,

object value, object comparand);

public static T CompareExchange<T>(ref T location1, T value,

T comparand) where T : class;

public static int Decrement(ref int location);

public static long Decrement(ref long location);

public static double Exchange(ref double location1, double value);

public static float Exchange(ref float location1, float value);

public static int Exchange(ref int location1, int value);

public static IntPtr Exchange(ref IntPtr location1, IntPtr value);

public static long Exchange(ref long location1, long value);

public static object Exchange(ref object location1, object value);

public static T Exchange<T>(ref T location1, T value) where T : class;

public static int Increment(ref int location);

public static long Increment(ref long location);

public static long Read(ref long location);

}

Internally, assigning a variable is a two-step (or more) process. First, there is a read and then an assignment. A thread could be preempted just after the read but before the write. When the methods just described are invoked, the operation is treated atomically. So, the read-and-write operations are never separated.

Using Concurrent Collections — ConcurrentQueue

Concurrency often involves shared memory. Tasks may need to share read-and-write access to a collection. Because threads may run concurrently, locking the object in memory before writing was always the only way to ensure only one thread was writing at a time. Locking creates contention, and contention can slow performance.

Although achieving this through locking has always been possible in the .NET Framework, the TPL includes optimized collections for parallel operations. ConcurrentQueue is one of a handful of concurrent collections found in the System.Collections.Concurrent namespace. Following is the ConcurrentQueueclass:

public class ConcurrentQueue<T> : IProducerConsumerCollection<T>,

IEnumerable<T>, ICollection, IEnumerable

{

public ConcurrentQueue();

public ConcurrentQueue(IEnumerable<T> collection);

public int Count { get; }

public bool IsEmpty { get; }

public void CopyTo(T[] array, int index);

public void Enqueue(T item);

public IEnumerator<T> GetEnumerator();

public T[] ToArray();

public bool TryDequeue(out T result);

public bool TryPeek(out T result);

}

As would be expected, ConcurrentQueue includes all the usual queuing operations such as Peek, Enqueue, and Dequeue. The following code demonstrates using ConcurrentQueue to queue a keystroke so that the TPLParentChild sample can run a cancellation through user input:

note

private ConcurrentQueue<ConsoleKey> _cancelAllCommand = new

ConcurrentQueue<ConsoleKey>();

private ConcurrentQueue<ConsoleKey> _cancelLocalCommand = new

ConcurrentQueue<ConsoleKey>();

private void StartGettingInput()

{

var task = new Task(() =>

{

var getInput = true;

while (getInput)

{

var key = Console.ReadKey().Key;//Block here until a key is

entered

switch (key)

{

case ConsoleKey.C:

_cancelAllCommand.Enqueue(key);

break;

case ConsoleKey.K:

_cancelLocalCommand.Enqueue(key);

break;

case ConsoleKey.Enter:

getInput = false;

break;

}

}

}

, TaskCreationOptions.LongRunning);

task.Start();

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Part of the RunVisit method demonstrated earlier in this chapter scans the cancellation queues for input, and executes either a global or local cancellation, depending on the input key. Following is the code again:

note

private long RunVisit(Task<long> task, CancellationTokenSource cancelLocal)

{

try

{

task.Start();

this.ExecuteOnInput(cancelLocal);

task.Wait();

return task.Result;

}

catch (AggregateException agExp)

{

agExp.Flatten().Handle(this.HandleException);

return 0;

}

}

private void ExecuteOnInput(CancellationTokenSource cancelLocal)

{

ConsoleKey key = default(ConsoleKey);

if (_cancelAllCommand.TryDequeue(out key))

{

_cancelGlobal.Cancel();

}

if (_cancelLocalCommand.TryDequeue(out key))

{

cancelLocal.Cancel();

}

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Like many of the Concurrent collections, ConcurrentQueue implements the IProducerConsumerCollection interface. This becomes more important later in this chapter. The IProducerConsumerCollection interface is implemented explicitly, so it does not appear in IntelliSense. The method is visible only when accessing the class through an Interface variable, as shown in the following example:

note

IProducerConsumerCollection<int> prod = new ConcurrentQueue<int>();

int val = -1;

prod.TryAdd(5);

prod.TryTake(out val);

Console.WriteLine("Here was the added value " + val.ToString());

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Understanding Continuations

Making a task's execution contingent on the success or failure of another task is accomplished using a continuation. A continuation works a lot like an event callback or delegate. A continuation can be any type of code that is compatible with a task. Examples are Action, lambda expression, Func<T>, and so on.

When the TPL executes a continuation, it creates and schedules another task, or it runs the continuation Action within the thread of the completed task. The task on which continuation is contingent is often referred to as the continuation's antecedent.

Following are examples of continuations:

note

//Single Continuation

var actToRun = new Action<Task<string>>(antecedent =>

{

Console.WriteLine("Task result was " + antecedent.Result);

}

);

var taskWithContinue = new Task<string>(() =>

{

return "yes it ran";

}

);

taskWithContinue.ContinueWith(actToRun);

taskWithContinue.Start();

taskWithContinue.Wait();

//Multiple antecedents

var list = new List<Task<string>>();

list.Add(Task.Factory.StartNew<string>(() =>

{

Console.WriteLine("Run 1");

return "task 1 ran"; }

)

);

list.Add(Task.Factory.StartNew<string>(() =>

{

Console.WriteLine("Run 2");

return "task 2 ran"; }

)

);

list.Add(Task.Factory.StartNew<string>(() =>

{

Console.WriteLine("Run 3");

return "task 3 ran";

}

)

);

var actAfterAll = new Action<Task<string>[]>(tasks =>

{

Console.WriteLine("Running AfterAll Continuation...");

foreach(var t in tasks)

{Console.WriteLine(t.Result); }

}

);

Task.Factory.ContinueWhenAll(list.ToArray(), actAfterAll);

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

As demonstrated in these samples, a continuation can be joined to a task or the default TaskScheduler. Continuations can execute contingent on the completion, success, or failure of a task, or the completion of an array of tasks. Often, a continuation needs to examine the Result of the antecedent. So, a continuation must be configured to accept the antecedent task. Multiple ContinueWith invocations can register a continuation Action for each invocation. The following sample code demonstrates this:

note

var task = new TaskCompletionSource<string>(new

List<Action<Task<string>>>());

var acts = new Action<Task<string>>[3]

{

t =>

{

Thread.Sleep(2000);

Console.WriteLine("0 ran..." + t.Result + " " +

Thread.CurrentThread.ManagedThreadId.ToString());

}

, t =>

{

Thread.Sleep(2000);

Console.WriteLine("1 ran..." + t.Result + " " +

Thread.CurrentThread.ManagedThreadId.ToString());

}

, t =>

{

Thread.Sleep(2000);

Console.WriteLine("2 ran..." + t.Result + " " +

Thread.CurrentThread.ManagedThreadId.ToString());

}

};//end of array

//Starts separate Task for each

task.Task.ContinueWith(acts[0]);

task.Task.ContinueWith(acts[1]);

task.Task.ContinueWith(acts[2]);

task.SetResult("Task result here");

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Like a task, a continuation has TaskContinuationOptions such as LongRunning, AttachedToParent, and PreferFairness. Continuations also work with CancellationTokens. For example, continuations that have not fired, but are configured with a CancellationToken that has been signaled, can move to the canceled state.

Continuations can fulfill multiple execution requirements. For example, a continuation can be configured to handle only exceptions. Continuations can be arranged into a dependency graph so that downstream Actions won't start until one or more upstream antecedents have completed.

TaskCompletionSource

Earlier in this chapter, you learned that a running task must execute on a thread somewhere within the TPL. That means that running a task not only occupies a thread for the duration of the task, but it also takes time to queue and schedule. Situations arise when a developer may want to, for example, activate a continuation without incurring the overhead to execute a task.

The following code demonstrates such a situation. In the code, a timer performs a delayed task execution.

note

var timerTime = new TimeSpan(0, 0, 5);

TaskCompletionSource<DateTime> comp = null;

Timer timer = null;

comp = new TaskCompletionSource<DateTime>();

timer = new System.Threading.Timer(obj =>

{

comp.SetResult(DateTime.Now);

timer.Dispose();

}

, null, timerTime, TimeSpan.FromMilliseconds(Timeout.Infinite));

comp.Task.Wait();

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

System.Threading.Timer handles the execution. So, a blocked or running task would be redundant and would be wasting a thread. According to the .NET documentation, Dispose must be run when the timer is no longer needed. TaskCompletionSource has a Task property. The code executes a Wait on the Taskproperty. TaskCompletionSource could have been a useful place to hang a continuation without scheduling a task and tying up a blocking thread that is waiting for a timer to fire.

The example called the SetResult method, but there are other methods for changing to a canceled or exception state. Like a Task class, the TaskCompletionSource constructor also supports TaskCreationOptions and a state parameter that is demonstrated later in this chapter.

Implementing Continuations

The following sample code is part of another solution to the .docx totaling problem introduced at the beginning of this chapter. Instead of relying on the parent-child AttachToParent options in the TPL, the code explicitly defines the parent-child relationship using continuations.

note

private class TaskVisitorFactory : ITaskVisitorFactory

{

private CancellationToken _cancelGlobal;

private IProcessorResults _results = null;

public TaskVisitorFactory(CancellationToken cancelGlobal,

IProcessorResults results)

{

_cancelGlobal = cancelGlobal;

_results = results;

}

public void Create(TaskCompletionSource<string> allChildCompletion,

string directoryPath, IEnumerable<string> children, string

extension)

{

List<Task> tasks = new List<Task>();

if (!(_cancelGlobal.IsCancellationRequested))

{

foreach (var child in children)

{

var curChild = child;

var visitor = new DirectoryVisitor(_results, this,

curChild, extension);

var task = new Task(visitor.Visit, _cancelGlobal);

//Handle cancellation at the individual task level

//Continue when all have been cancelled; some may

//have run;

//this allows everything to happen in natural flow

task.ContinueWith(tCont => { visitor.CancelVisit(); }

, TaskContinuationOptions.OnlyOnCanceled

| TaskContinuationOptions.ExecuteSynchronously);

//The task is completed when the Action completes

//and all of its children signal complete

tasks.Add(task); //Child

tasks.Add(visitor.TaskVisitAllChildren); //Child's children

task.Start();

}

}

//When all the children and children's children complete signal

//Parent children complete

if (tasks.Count > 0)

{

Task.Factory.ContinueWhenAll(tasks.ToArray(), t =>

{

foreach (var cur in t)

{

if (cur.Exception != null)

{

cur.Exception.Handle(e =>

{

Console.WriteLine("EXCEPTION Observed: " +

e.Message);

return true;

});

}

}

allChildCompletion.SetResult(directoryPath);

});

}

else

{ allChildCompletion.SetResult(directoryPath); }

}

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Figure 11.4 shows the assembled relationships.

Figure 11.4 TPLContinuation sample layout

11.4

You create multiple antecedent continuations on the Task.Factory property. Multiple task antecedents are handled by the TaskFactory class. ContinueWhenAll includes the immediate child tasks and the task associated with the child's children. A task's children's children are represented by aTaskCompletionSource. A parent task registers a continuation that waits for all tasks it has created, as well as the TaskCompletionSource contained in the created child task. Also demonstrated is configuring a continuation to handle only exceptions via the TaskContinuationOptions.OnlyOnCancelled. TaskContinuationOptionssupport combinations of TaskContinueOptions. However, care should be taken to ensure mutually exclusive options are not selected.

Although it is not as obvious, notice how this solution has a recursive feel to it. Just like the TPLParentChild solution discussed earlier in the chapter, this solution takes a divide-and-conquer approach. Each directory computation visit is allocated to a task. Marking completions is explicitly dealt with by another class assembling the relationships with continuations.

The TPLParentChild and standard recursive examples earlier in the chapter implicitly relied on the structure of the algorithm to mark completed directories. Essentially, the marking of completed and visiting were coupled together.

Receiving a total from all children meant that the directory, its children, and children's children were all processed. Completing the root directory meant the whole process was completed.

TPLParentChild was much simpler, but not as flexible. By coupling the computation to the completion, TPLParentChild could not, for example, fork off some additional calculations unrelated to the parent-child task relationship without making major changes to the algorithm.

Rather than allocating and running a blocking task (and thread), the child's children task is a TaskCompletionSource. Executing ContinueWhenAll runs a continuation that marks all children complete. The arrangement frees threads for doing directory visits and leaves the progress and ordering to relationships expressed in continuations.

Each task created in the previous code executes the following DirectoryVisitor class code:

note

public Task<string> TaskVisitAllChildren { get { return

_completionAllChildren.Task; } }

public void Visit()

{

var comp = new ProgressMessage(_directoryPath);

try

{

var dir = new DirectoryInfo(_directoryPath);

long bytes = 0;

IEnumerable<string> children =

dir.EnumerateDirectories().Select(info =>

{ return info.FullName; });

bytes = dir.EnumerateFiles("*." + _extension).Sum(f =>

{ return f.Length; });

//Report results to Collector

comp.SetMessagePayload(new DirectoryVisitComplete(bytes ));

_results.PostResults(comp);

//Spawn children

_factory.Create(_completionAllChildren, _directoryPath,

children, _extension);

}

catch (Exception ex)

{

comp.SetMessagePayload(new DirectoryVisitException(ex));

_completionAllChildren.SetResult(_directoryPath); //No children

to process

_results.PostResults(comp);

}

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

An exception is generated if a developer attempts to change a TaskCompletionSource status. Utilizing the Try methods is a safe way to change status if multiple places could also change status. In the example, a status change could be attempted in the ContinueWhenAll and in the task. Totaling is sent to a separate process that collects and summarizes the results. This process is examined later in this chapter.

AsyncState

As stated earlier in this chapter, tasks implement IAsyncResult. One of the IAsyncResult implementation requirements is AsyncState. Tasks participating in the asynchronous calls make use of this property. But for other tasks, the property is typically null. Though using AsyncState with other tasks is not required, its existence means a developer could attach other pieces of information to a task, and indirectly to a TaskCompletionSource. AsyncState would appear to be an empty piece of class real estate.

This section demonstrates how the property can be leveraged but focuses on what can be done, rather than what is recommended practice.

Extension functions enable a developer to add methods without creating subclasses or interfaces. They're ideal for tapping what was attached to AsyncState. The following example code demonstrates how classes can be attached to AsyncState, and how a couple of extension functions leverageAsyncState:

note

static void RunAsyncState()

{

var action = new Action(() =>

{

Console.WriteLine("Running the Async Action");

Thread.Sleep(2000);

}

);

var objs = new Tuple<string, Action>("I'm Tom", action);

var comp = new TaskCompletionSource<object>(objs);

comp.Task.ContinueWith(t => { Console.WriteLine("Continuation

ran.."); });

Console.WriteLine("Starting Async Action..");

comp.GetAttachedAction()();

Console.WriteLine("Do something else here...");

Thread.Sleep(500);

Console.WriteLine("Completed Async Action.. its message was " +

comp.GetAttachedMessage());

//Activate the continuation

comp.SetResult(null);

}

static class TPLExts

{

public static Action GetAttachedAction<T>(this TaskCompletionSource<T>

taskComp)

{

return ((Tuple<string, Action>)taskComp.Task.AsyncState).Item2;

}

public static string GetAttachedMessage<T>(this TaskCompletionSource<T>

taskComp)

{

return ((Tuple<string, Action>)taskComp.Task.AsyncState).Item1;

}

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

The attached tuple is composed of a string and an Action. Extension functions (GetAttachedAction, GetAttachedMessage) are implemented on the TaskCompletionSource. You learned about the significance of TaskCompletionSource earlier in this chapter.

In the example, the associated Action is stored in the AsyncState, and task status is manually set after some additional work following the Action invocation. This arrangement would be useful in situations in which a task is needed for a continuation (as in the example), but scheduling a task would not be desirable.

The string value is used to identify the task or TaskCompletionSource. The string could have been printed in a Trace statement or viewed in the Debugger. An application may have hundreds of running tasks. The string value may be useful for distinguishing tasks.

Using the BlockingCollection Class

Asynchronous messaging and isolation are ways to avoid shared state and to decouple parallel operations. Instead of modifying a common place in memory and invoking methods on each other, parallel operations pass copies of data. Concurrent operations may also operate at different speeds. Queuing messages is a common way to throttle processing disparities.

Figure 11.5 shows a typical internal asynchronous messaging solution with a queue.

Figure 11.5 Asynchronous messaging

11.5

An actor/agent is often used to characterize a parallel programming solution centered on messaging. A producer/consumer and pipelining both involve portions of a process that emit data and a portion that manipulates what is emitted. In each of the patterns, the solution involves two or more components coordinating their activities by exchanging data. If the data exchange is not efficiently done, it can become a bottleneck in a developer's code.

Writing and removing both modify the underlying data structure and require some means (such as a lock) to share access. Some code segments may take longer to process than other parts. Queuing helps, but some way to pause the producer, or allow the consumer to do something else while waiting for data, is essential to building flexible, efficient algorithms.

A BlockingCollection class has been built for handling all these scenarios.

The following code shows how to use a BlockingCollection class:

note

var blockingCollection = new BlockingCollection<string>();

//ConcurrentQueue is default

Task.Factory.StartNew(() =>

{

for (int n = 0; n < 10; ++n)

{

blockingCollection.Add(n.ToString());

Console.WriteLine("Just added " + n.ToString());

Console.WriteLine("Added " + n.ToString());

}

blockingCollection.CompleteAdding();

}

);

Task.Factory.StartNew(() =>

{

foreach (var val in

blockingCollection.GetConsumingEnumerable())

{

Console.WriteLine("Removed " + val);

Thread.Sleep(1000);

}

Console.WriteLine("Done, CompleteAdding has been called.");

}

);

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

BlockingCollection supports and offers a variety of ways to consume and add data. GetConsumingEnumerable is recommended over regular foreach iterations. As the method name implies, each iteration removes an item from the collection. TryTake enables optional timeouts if data is not present in a TimeSpan. Takeblocks until data is present. However, Take can generate an exception when attempting a remove after the CompleteAdding has been called.

The following code snippet shows some other ways to access the BlockingCollection:

note

var blockingCollection = new BlockingCollection<string>(5);

//Capacity throttles process

Task.Factory.StartNew(() =>

{

for (int n = 0; n < 10; ++n)

{

while (!blockingCollection.TryAdd(n.ToString(), 300))

{ Console.WriteLine("Just did try add and failed " +

n.ToString()); }

Console.WriteLine("Added " + n.ToString());

}

blockingCollection.CompleteAdding();

}

);

Task.Factory.StartNew(() =>

{

var val = "";

while (blockingCollection.TryTake(out val))

{

Console.WriteLine("Removed " + val);

Thread.Sleep(1000);

}

Console.WriteLine("Done, CompleteAdding has been called.");

}

);

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Adding a bounded capacity blocks the producing process and allows the consuming process to catch up. The preceding code is bound to five messages. A slight processing delay was added to the consuming side. Notice how the producer cannot proceed until space is made in theBlockingCollection. In fact, TryAdd repeatedly attempts to add until it succeeds.

By default, internally, a BlockingCollection uses a ConcurrentQueue, but anything supporting the IProducerConsumerCollection interface is supported. Following is an example using the ConcurrentStack:

var blockingCollection = new BlockingCollection<string>

(new ConcurrentStack<string>());

With the ConcurrentStack, ordering in the BlockingCollection becomes Last-In-First-Out (LIFO). BlockingCollection has been optimized for performance and, internally, utilizes lightweight locking mechanisms.

Working with a BlockingCollection

As stated earlier, BlockingCollection supports scenarios centered around messaging. Earlier demonstrations showed how to configure the BlockingCollection. This section demonstrates an implementation from the TPLContinuation sample.

As stated earlier, the concurrent collections underpinning BlockingCollection can gracefully handle the contention often involved when disparate threads are adding data. The DirectoryVisitor class posts messages to the BlockingCollection. A ProgressMessageCollector class gathers the results, as shown here:

note

private IProcessorResults _processor;

public void Process(Task rootCompletion)

{

var tryTakeFailedCount = 0;

var task = Task.Factory.StartNew(() =>

{

try

{

ProgressMessage result = null;

bool gotOne = false;

while (!(rootCompletion.IsCompleted))

{

gotOne = _processor.TryTake(out result);

if (gotOne)

{

switch (result.GetProgressType())

{

}

}

else

{ tryTakeFailedCount = tryTakeFailedCount + 1; }

}

}

catch (Exception ex)

{ Console.WriteLine("EXCEPTION " + ex.Message +

ex.StackTrace); }

}, TaskCreationOptions.LongRunning);

task.Wait();

Console.WriteLine("Total bytes == " + _totalBytes.ToString() + "

try take failed count " + tryTakeFailedCount.ToString() +

" times");

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

The whole arrangement looks much like Figure 11.6.

Figure 11.6 Visitor messaging

11.6

DirectoryVisitor classes call TryTake with a timeout. While developing the solution, the 10-millisecond timeout seemed to result in the fewest expiration failures. Although there was no recommended time limit in the TPL documentation, one of the overloads was geared specifically to milliseconds. Overloads like this normally imply that the most common scenario can be something less than a 1-second value.

The ProcessMessage class is a receptacle, as shown here:

note

public sealed class ProgressMessage

{

public string Identification { get; private set; }

public Type PayloadType { get; private set; }

private object _value = null;

public void SetMessagePayload(object value)

{

_value = value;

this.PayloadType = value.GetType();

}

public T GetPayload<T>() { return (T)_value; }

public ProgressMessage(string identification)

{

this.Identification = identification;

}

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

The message is a receptacle for a more specific payload. Separating the two parts allows for payload flexibility. Another portion gathering cancellation commands from user input also posts messages to the BlockingCollection.

BlockingCollection is hidden behind a class interface implementation, as shown here:

note

private class ProcessorResults : IProcessorResults

{

private BlockingCollection<ProgressMessage> _resultsQueue =

new BlockingCollection<ProgressMessage>(1024);

public void PostResults(ProgressMessage result)

{

_resultsQueue.Add(result);

}

public bool TryTake(out ProgressMessage result)

{

return _resultsQueue.TryTake(out result, 10);

}

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

Being a full-function data structure, it's often better to hide its usage behind a narrower set of options. Enabling full collection functionality means a developer could invoke methods that an architect never intended.

Understanding SpinWait.SpinUntil

SpinWait lives apart from the TPL in the System.Threading namespace. SpinWait is a code-blocking alternative to methods such as Thread.Sleep that skip its operating system timeslice, rather than telling the operating system to wake it up after a period of time. SpinWait has use across all TPL-based solutions. TPL solutions often need to block and check for some changing condition before proceeding. SpinUntil combines the blocking and checking in a single method. In addition, SpinUntil supports timeouts.

Following are some overloads of the SpinUntil method:

public static void SpinUntil(Func<bool> condition);

public static bool SpinUntil(Func<bool> condition, int millisecondsTimeout);

public static bool SpinUntil(Func<bool> condition, TimeSpan timeout);

Because this chapter's SpinUntil example utilizes the BlockingCollection, SpinWait.SpinUntil is introduced here. In the following BlockingCollection array extension function code example, SpinUntil blocks until data appears on any of the array's BlockingCollections:

note

static class TPLExts

{

public static bool WaitForData<T>(this BlockingCollection<T>[]

collections, int timeout)

{

var found = false;

found = SpinWait.SpinUntil(() =>

{

return collections.Where(c =>

{ return c.Count > 0; }

).Count() > 0;

}

,timeout);

return found;

}

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

As stated earlier, BlockingCollection is built for performance, so this is efficient code. SpinWait.SpinUntil periodically calls the supplied function between yields. Applying WaitForData could work as shown in the following sample:

note

//Create an array of BlockingCollections

var collections = new BlockingCollection<string>[]

{new BlockingCollection<string>(),new BlockingCollection<string>()};

var found = false;

var indexWithData = 0;

found = collections.WaitForData(100);

Console.WriteLine("Was data found " + found.ToString());

collections[indexWithData].Add("Some data");

found = collections.WaitForData(100);

Console.WriteLine("Was data found " + found.ToString());

if (found)//Start Task to service the BlockingCollection

{

Task.Factory.StartNew(() =>

{

Console.WriteLine("Service data " +

collections[indexWithData].Take());

}

);

}

Code file [Ch11_Code_Samples.zip] available for download at Wrox.com

In the scenario envisioned in the previous code, a developer waits for data to appear before consuming it, delegating consumption to another task when one of the BlockingCollections contain data. TryTake removes data and, therefore, was not an acceptable solution.

Like TryTake, the BlockingCollection.TakeFromAny was also not an acceptable solution. Rather than creating multiple blocked tasks, each monitoring a single BlockingCollection, it was more efficient to create a single long-running task that monitors multiple BlockingCollections. In the example, the long-running task is the current thread. When data does appear, a task is created to service the BlockingCollection.

SpinWait.SpinUntil has a myriad of other uses. Many of the uses involve checking a property on a TPL class. The following list shows some other examples.

· Creating a continuation without creating a task (spinning and checking the IsCompleted property on an antecedent)

· Coupling waiting on a task with waiting on BlockingCollection data

· Waiting for a cancellation

Summary

Learning the Task Parallel Library (TPL) starts with understanding a core set of classes. Most important of all the classes are tasks. Developers package code, configure, and submit tasks to the TPL. Tasks store the results of the executing code. Results can be some value, or even an exception.

When dealing with exceptions, the TPL utilizes the standard .NET exception handling. However, the nature of parallel tasks means that hundreds of tasks could generate hundreds of exceptions. An aptly named class called AggregateException simplifies the collection-and-handling process.

Aside from exceptions, tasks must deal with gracefully aborting what is being done. Cancellations provide mechanisms for stopping tasks. One of the TPL's greatest strengths is its capability to join completed or failed tasks to a new task. Joining a finished task to another task is called acontinuation. Continuation composition is not restricted to a task, nor is it limited to just TPL-based parallel components.

The TaskCompletionSource can be helpful when working with asynchronous or other thread-based components of the .NET Framework. TaskCompletionSource enables a developer to control a task result. The TPL joins the old to the new in other ways.

Collections are everywhere in the .NET Framework. Collections present unique challenges when coupled with concurrency. The TPL includes concurrency-friendly collections. These underlying containers support another class called the BlockingCollection.

A BlockingCollection enables parallel components to share data using a messaging pattern often called actor/agent, producer/consumer, or pipelining. BlockingCollections, as well as all TPL classes, can benefit from some of the other new components in the .NET Framework — in particular, the SpinWaitclass.

About the Author

Jeffrey Juday is a developer specializing in enterprise integration solutions utilizing SharePoint, WCF, WF, and SQL Server. He has been developing software with Microsoft tools for more than 15 years in a variety of industries, including military, manufacturing, financial services, management consulting, and computer security. Juday is a Microsoft MVP. When not writing or developing, he spends his time with his wife and daughter.