Multithreading - Essential C# 6.0 (2016)

Essential C# 6.0 (2016)

18. Multithreading

Two significant trends of the past decade have had an enormous effect on the field of software development. First, the continued decrease in the cost of performing computations is no longer driven by increases in clock speed and transistor density, as illustrated by Figure 18.1. Rather, the cost of computation is now falling because it is economical to make hardware that has multiple CPUs.

Image

Image

FIGURE 18.1: Clock Speeds over Time
(Graph compiled by Herb Sutter. Used with permission. Original at www.gotw.ca.)

Second, computations now routinely involve enormous latency. Latency is, simply put, the amount of time required to obtain a desired result. There are two principal causes of latency. Processor-bound latency occurs when the computational task is complex; if a computation requires performing 12 billion arithmetic operations and the total processing power available is only 6 billion operations per second, at least 2 seconds of processor-bound latency will be incurred between asking for the result and obtaining it. I/O-bound latency, by contrast, is latency incurred by the need to obtain data from an external source such as a disk drive, web server, and so on. Any computation that requires fetching data from a web server physically located far from the client machine will incur latency equivalent to millions of processor cycles.

These two trends together create an enormous challenge for modern software developers. Given that machines have more computing power than ever, how are we to make effective use of that power to deliver results to the user quickly, and without compromising on the user experience? How do we avoid creating frustrating user interfaces that freeze up when a high-latency operation is triggered? Moreover, how do we go about splitting CPU-bound work among multiple processors to decrease the time required for the computation?

The standard technique for engineering software that keeps the user interface responsive and CPU utilization high is to write multithreaded programs that do multiple computations “in parallel.” Unfortunately, multithreading logic is notoriously difficult to get right; we’ll spend the next two chapters exploring what makes multithreading difficult, and learning how to use higher-level abstractions and new language features to ease that burden.

The higher-level abstractions we’ll discuss are, first, the two principal components of the Parallel Extensions library that was released with .NET 4.01—the Task Parallel Library (TPL) and Parallel LINQ (PLINQ)—and second, the Task-based Asynchronous Pattern (TAP) and its accompanying language support in C# 5.0. Although we strongly encourage you to use these higher-level abstractions, we will also cover some of the lower-level threading APIs from previous versions of the .NET runtime in this chapter. Additional multithreading patterns prior to C# 5.0 are available for download at http://IntelliTect.com/EssentialCSharp along with the chapters from Essential C# 3.0. Thus, if you want to fully understand the resources from multithreaded programming without the later features, you still have access to that material.

1. These libraries are available in .NET 3.5 by downloading the Reactive Extensions library for .NET 3.5, but this is not officially supported.

We’ll start this chapter with a few beginner topics in case you are new to multithreading. Then we’ll briefly discuss “traditional” thread manipulation without using the Parallel Extensions libraries to ensure that you have a basic understanding of thread manipulation; the following chapter goes into more details on that topic. We’ll then spend most of this chapter covering the TPL, TAP, and PLINQ, in that order.

Multithreading Basics


Beginner Topic: Multithreading Jargon

There is a lot of confusing jargon associated with multithreading, so let’s define a few terms.

A CPU (central processing unit) or core2 is the unit of hardware that actually executes a given program. Every machine has at least one CPU, though today multiple CPU machines are common. Many modern CPUs support simultaneous multithreading (which Intel trademarks as Hyper-Threading), a mode where a single CPU can appear as multiple “virtual” CPUs.

2. Technically we ought to say that “CPU” always refers to the physical chip and “core” may refer to a physical or virtual CPU. This distinction is unimportant for the purposes of this book, so we will use these terms interchangeably.

A process is a currently executing instance of a given program; the fundamental purpose of the operating system is to manage processes. Each process contains one or more threads. A process is represented by an instance of the Process class in the System.Diagnosticsnamespace.

C# programming at the level of statements and expressions is fundamentally about describing flow of control, and thus far in this book we’ve made the implicit assumption that a given program has only a single “point of control.” You can imagine the point of control as being a cursor that enters the text of your program at the Main method when you start it up, and then moves around the program as the various conditions, loops, method calls, and so on, are executed. A thread is this point of control. A thread is represented by an instance of theSystem.Threading.Thread class and the API for manipulating a Thread is in the same System.Threading namespace.

A single-threaded program is one in which there is only one thread in the process. A multithreaded program has two or more threads in the process.

A piece of code is said to be thread safe if it behaves correctly when used in a multithreaded program. The threading model of a piece of code is the set of requirements that the code places upon its caller in exchange for guaranteeing thread safety. (For example, the threading model of many classes is “static methods may be called from any thread but instance methods may be called only from the thread that allocated the instance.”)

A task is a unit of potentially high-latency work that produces a resultant value or desired side effect. The distinction between tasks and threads is as follows: A task represents a job that needs to be performed, whereas a thread represents the worker that does the job. A task is useful only for its side effects and is represented by an instance of the Task class. A task used to produce a value of a given type is represented by the Task<T> class, which derives from the nongeneric Task type. These can be found in the System.Threading.Tasksnamespace.

A thread pool is a collection of threads, along with logic for determining how to assign work to those threads. When your program has a task to perform, it can delegate a worker thread from the pool, assign the thread to perform the task, and then de-allocate it when the work completes, thereby making it available the next time additional work is requested.



Beginner Topic: The Why and How of Multithreading

There are two principal scenarios for multithreading: enabling multitasking and dealing with latency.

Users think nothing of running dozens of processes at the same time. They might have presentations and spreadsheets open for editing while at the same time they are browsing documents on the Internet, listening to music, receiving instant messages and email arrival notifications, and watching the little clock in the corner. Each of these processes has to continue to do its job even though it is not the only task the machine has to attend to. This kind of multitasking is usually implemented at the process level, but there are situations in which you want to do this sort of multitasking within a single process.

For the purposes of this book, however, we will mostly be considering multithreading as a technique for dealing with latency. For example, to import a large file while simultaneously allowing a user to click Cancel, a developer creates an additional thread to perform the import. By performing the import on a different thread, the user can request cancellation instead of freezing the user interface until the import completes.

If enough cores are available that each thread can be assigned a core, each thread essentially gets its own little machine. However, more often than not there are more threads than cores. Even the relatively common multicore machines of today still have only a handful of cores, while each process could quite possibly run dozens of threads.

To overcome the discrepancy between the numerous threads and the handful of cores, an operating system simulates multiple threads running concurrently by time slicing. The operating system switches execution from one thread to the next so quickly that it appears the threads are executing simultaneously. The period of time that the processor executes a particular thread before switching to another is the time slice or quantum. The act of changing which thread is executing in a given core is called a context switch.

The effect is similar to that of a fiber-optic telephone line in which the fiber-optic line represents the processor and each conversation represents a thread. A (single-mode) fiber-optic telephone line can send only one signal at a time, but many people can hold simultaneous conversations over the line. The fiber-optic channel is fast enough to switch between conversations so quickly that each conversation appears uninterrupted. Similarly, each thread of a multithreaded process appears to run continuously with other threads.

If two operations are running “in parallel,” via either true multicore parallelism or simulated parallelism using time slicing, they are said to be concurrent. To implement such concurrency, you invoke it asynchronously, such that both the execution and the completion of the invoked operation are separate from the control flow that invoked it. Concurrency, therefore, occurs when work dispatched asynchronously executes in parallel with the current control flow. Parallel programming is the act of taking a single problem and splitting it into pieces, whereby you asynchronously initiate the process of each piece such that the pieces can all be processed concurrently.



Beginner Topic: Performance Considerations

A thread that is servicing an I/O bound operation can essentially be ignored by the operating system until the result is available from the I/O subsystem; switching away from an I/O bound thread to a processor-bound thread results in more efficient processor utilization because the CPU is not idle while waiting for the I/O operation to complete.

However, context switching is not free; the current internal state of the CPU must be saved to memory, and the state associated with the new thread must be loaded. If there are too many threads, the switching overhead can begin to noticeably affect performance. Adding more threads will likely decrease performance further, to the point where the processor spends more time switching from one thread to another than it does accomplishing the work of each thread.

Even if we ignore the cost of context switching, time slicing itself can have a huge impact on performance. Suppose, for example, that you have two processor-bound high-latency tasks, each working out the average of two lists of 1 billion numbers each. Suppose the processor can perform 1 billion operations per second. If the two tasks are each associated with a thread, and the two threads each have their own core, obviously we can get both results in 1 second.

If, however, we have a single processor that the two threads share, time slicing will perform a few hundred thousand operations on one thread, then switch to the other thread, then switch back, and so on. Each task will consume a total of 1 second of processor time, and the results of both will therefore be available after 2 seconds, leading to an average completion time of 2 seconds. (Again, we are ignoring the cost of context switching.)

If we assigned those two tasks to a single thread that performed the first task and did not even start the second until after the first was completed, the result of the first task would be obtained in 1 second and the result of the subsequent task would be obtained 1 second after that, leading to an average time of 1.5 seconds (a task completes in either 1 or 2 seconds and, therefore, on average completes in 1.5 seconds).


Guidelines

DO NOT fall into the common error of believing that more threads always makes code faster.

DO carefully measure performance when attempting to speed up processor-bound problems through multithreading.




Beginner Topic: Threading Problems

We’ve said several times that writing multithreaded programs is complex and difficult, but we have not said why. In a nutshell, the problem is that many of our reasonable assumptions that are true of single-threaded programs are violated in multithreaded programs. The issues include a lack of atomicity, race conditions, complex memory models, and deadlocks.

Most Operations Are Not Atomic

An atomic operation is one that always is observed to be either not started or already completed. Its state is never externally visible as “in progress.” Consider, for example, this code fragment:

if (bankAccounts.Checking.Balance >= 1000.00m)
{
bankAccounts.Checking.Balance -= 1000.00m;
bankAccounts.Savings.Balance += 1000.00m;
}

This operation—checking for available funds, and then conditionally debiting one account and crediting another—needs to be atomic. In other words, for it to execute correctly, we must ensure that there is never a moment when the operation can be observed to be partially completed. Imagine, for example, that two threads are running in this code concurrently. It is possible that both threads verify that there are sufficient funds in the account, and then both threads do a transfer of funds, even if there are only sufficient funds in the account to do the transfer once. And, in fact, the situation is considerably worse: There are no operations in this code fragment that are atomic! Even operations like compound addition/subtraction or reading and writing a property of decimal type are nonatomic operations in C#. As such, they can all be observed to be “partially complete” in multithreaded scenarios—only partially incremented or decremented. The observation of inconsistent state due to partially completed nonatomic operations is a special case of a more general problem, called a race condition.

Uncertainty Caused by Race Conditions

As we discussed earlier, concurrency is often simulated by time slicing. In the absence of special control flow structures (which we will discuss in the next chapter in detail), the operating system can switch contexts between any two threads at any time of its choosing. As a consequence, when two threads are accessing the same object, which thread “wins the race” and gets to run first is unpredictable. If there are two threads running in the code fragment given previously, for example, it is possible that one thread might win the race and get all the way to the end before the second thread gets a chance to run. It is also possible that the context switch might happen after the first thread does the balance check, and the second thread might then win the race to get all the way to the end first.

The behavior of code that contains race conditions depends on the timing of context switches. This dependency introduces uncertainty concerning program execution. The order in which one instruction will execute relative to an instruction in a different thread is unknown. The worst of it is that often code containing race conditions will behave correctly 99.9 percent of the time, and then one time in a thousand a different thread wins the race due to an accident of timing. This unpredictability is what makes multithreaded programming so difficult.

Because such race conditions are difficult to replicate in the laboratory, much of the quality assurance of multithreaded code depends on long-running stress tests, specially designed code analysis tools, and a significant investment in code analysis and code review by experts.

The following chapter is about techniques for dealing with race conditions.

Memory Models Are Complex

The existence of race conditions, where two points of control can “race” through a piece of code at unpredictable and inconsistent speeds, is bad enough, but it gets worse. Consider two threads that are running on two different processors, but are accessing the same fields of some object. Modern processors do not actually access main memory every time you use a variable. Rather, they make a local copy in special “cache” memory on the processor; these caches are then periodically synchronized with main memory. This means that two threads that read and write the same location on two different processors can, in fact, be failing to observe each other’s updates to that memory, or observing inconsistent results. Essentially what we have here is a race condition that depends on when processors choose to synchronize their caches.

Locking Leads to Deadlocks

Clearly there must exist mechanisms to make nonatomic operations into atomic operations, to instruct the operating system to schedule threads so as to avoid races, and to ensure that processor caches are synchronized when necessary. The primary mechanism used to solve all these problems in C# programs is the lock statement. This statement allows the developer to identify a section of code as “critical” code that only one thread may be in at one time; if multiple threads try to enter the critical section, the operating system will suspend all but one. The operating system also ensures that processor caches are synchronized properly upon encountering a lock.

However, locks introduce problems of their own. Most notably, if the order of lock acquisition between threads varies, a deadlock could occur such that threads freeze, each waiting for the other to release its lock.

For example:

Image

At this point, each thread is waiting on the other thread before proceeding, so each thread is blocked, leading to an overall deadlock in the execution of that code.

We discuss various locking techniques in detail in Chapter 19.


Guidelines

DO NOT make an unwarranted assumption that any operation that is atomic in regular code will be atomic in multithreaded code.

DO NOT assume that all threads will observe all side effects of operations on shared memory in a consistent order.

DO ensure that code that concurrently holds multiple locks always acquires them in the same order.

AVOID all “race conditions”—that is, conditions where program behavior depends on how the operating system chooses to schedule threads.



Working with System.Threading

The Parallel Extensions library is extraordinarily useful because it allows you to manipulate a higher-level abstraction, the task, rather than working directly with threads. However, you might need to work with code written before the TPL and PLINQ were available (prior to .NET 4.0), or you might have a programming problem not directly addressed by them. In this section, we briefly cover some of the basic underlying APIs for directly manipulating threads.

Asynchronous Operations with System.Threading.Thread

The operating system implements threads and provides various unmanaged APIs to create and manage those threads. The CLR wraps these unmanaged threads and exposes them in managed code via the System.Threading.Thread class, an instance of which represents a “point of control” in the program. As mentioned earlier, you can think of a thread as a “worker” that independently follows the instructions that make up your program.

Listing 18.1 provides an example. The independent point of control is represented by an instance of Thread that runs concurrently. A thread needs to know which code to run when it starts up, so its constructor takes a delegate that refers to the code that is to be executed. In this case we convert a method group, DoWork, to the appropriate delegate type, ThreadStart. We then start the thread running by calling Start(). While the new thread is running, the main thread attempts to print 10,000 hyphens to the console. We instruct the main thread to then wait for the worker thread to complete its work by calling Join(). The result is shown in Output 18.1.

LISTING 18.1: Starting a Method Using System.Threading.Thread


using System;
using System.Threading;

public class RunningASeparateThread
{
public const int Repetitions = 1000;

public static void Main()
{
ThreadStart threadStart = DoWork;
Thread thread = new Thread(threadStart);
thread.Start();
for(int count = 0; count < Repetitions; count++)
{
Console.Write('-');
}
thread.Join();
}

public static void DoWork()
{
for(int count = 0; count < Repetitions; count++)
{
Console.Write('+');
}
}
}


OUTPUT 18.1

++++++++++++++++++++++++++++++++----------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+++++++++++++++++++++++++++++-------------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------------
------------------------------------------------------------------------
-------------------------------------------------------+++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++------------------------------------------------------
------------------------------------------------------------------------
-----------------------------------------------+++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+++++++++++++++++++++++++++++++++++++++++++++

As you can see, the threads appear to be taking turns executing, each printing out a few hundred characters before the context switches. The two loops are running “in parallel,” rather than the first one running to completion before the second one begins, as it would if the delegate had been executed synchronously.

For code to run under the context of a different thread, you need a delegate of type ThreadStart or ParameterizedThreadStart to identify the code to execute. (The latter allows for a single parameter of type object; both are found in the System.Threading namespace.) Given a Thread instance created using the thread-start delegate constructor, you can start the thread executing with a call to thread.Start(). (Listing 18.1 creates a variable of type ThreadStart explicitly to show the delegate type in the source code. The method group DoWorkcould have been passed directly to the thread constructor.) The call to Thread.Start() tells the operating system to begin concurrent execution of the new thread; control on the main thread immediately returns from the call and executes the for loop in the Main() method. The threads are now independent, and neither waits for the other until the call to Join().

Thread Management

Threads include a number of methods and properties for managing their execution. Here are some of the basic ones:

• As we saw in Listing 18.1, you can cause one thread to wait for another with Join(). This tells the operating system to suspend execution of the current thread until the other thread is terminated. The Join() method is overloaded to take either an int or a TimeSpan to support a maximum time to wait for thread completion before continuing execution.

• By default, a new thread is a “foreground” thread; the operating system will terminate a process when all its foreground threads are complete. You can mark a thread as a “background” thread by setting the IsBackground property to true. The operating system will then allow the process to be terminated even if the background thread is still running. However, it is still a good idea to ensure that all threads are not aborted and instead to exit cleanly before the process exits; see the section on thread aborting later in this chapter for more details.

• Every thread has an associated priority, which you can change by setting the Priority property to a new ThreadPriority enum value. The possible values are Lowest, BelowNormal, Normal, AboveNormal, and Highest. The operating system prefers to schedule time slices to higher-priority threads. Be careful; if you set the priorities incorrectly, you can end up with “starvation” situations where one high-priority thread prevents many low-priority threads from ever running.

• If you simply want to know whether a thread is still “alive” or has finished all of its work, you can use the Boolean IsAlive property. A more informative picture of a thread’s state is accessible through the ThreadState property. The ThreadState enum values are Aborted,AbortRequested, Background, Running, Stopped, StopRequested, Suspended, SuspendRequested, Unstarted, and WaitSleepJoin. These are flags; some of these values can be combined.

There are two commonly used, and commonly abused, methods for controlling threads that deserve to be discussed in their own sections: Sleep() and Abort().

Do Not Put Threads to Sleep in Production Code

The static Thread.Sleep() method puts the current thread to sleep, essentially telling the operating system to not schedule any time slices to this thread until the given amount of time has passed. A single parameter—either a number of milliseconds or a TimeSpan—specifies how long the operating system will wait before continuing execution. While it is waiting, the operating system will, of course, schedule time slices for any other threads that might be waiting their turn to execute. This might sound like a sensible thing to do, but it is a “bad code smell” that indicates the design of the program could probably be better.

Threads are often put to sleep to try to synchronize a thread with some event in time. However, the operating system does not guarantee any level of precision in its timing. That is, if you say, “Put me to sleep for 123 milliseconds,” the operating system will put the thread to sleep for at least 123 milliseconds, and possibly much longer. The actual amount of time between the thread going to sleep and then waking up again is not deterministic and can be arbitrarily long. Do not attempt to use Thread.Sleep() as a high-precision timer, because it is not.

Worse, Thread.Sleep() is often used as a “poor man’s synchronization system.” That is, if you have some unit of asynchronous work, and the current thread cannot proceed until that work is done, you might be tempted to put the thread to sleep for much longer than you think the asynchronous work will take, in the hopes that it will be finished when the current thread wakes up. This is a bad idea: Asynchronous work, by its very nature, can take longer than you think. Use proper thread synchronization mechanisms, described in the next chapter, to synchronize threads. (We’ll give an example of this sort of abuse in Listing 18.2.)

Putting a thread to sleep is also a bad programming practice because it means that the sleeping thread is, obviously, unresponsive to attempts to run code on it. If you put the main thread of a Windows application to sleep, that thread will no longer be processing messages from the user interface, and will therefore appear to be hung.

More generally, putting a thread to sleep is a bad programming practice because the whole point of allocating an expensive resource like a thread is to get work out of that resource. You wouldn’t pay an employee to sleep, so do not pay the price of allocating an expensive thread only to put it to sleep for millions or billions of processor cycles.

That said, there are some valid uses of Thread.Sleep(). First, putting a thread to sleep with a time delay of zero tells the operating system “the current thread is politely giving up the rest of its quantum to another thread if there is one that can use it.” The polite thread will then be scheduled normally, without any further delay. Second, Thread.Sleep() is commonly used in test code to simulate a thread that is working on some high-latency operation without actually having to burn a processor doing some pointless arithmetic. Other uses in production code should be reviewed carefully to ensure that there is not a better way to obtain the desired effect.

In task-based asynchronous programming in C# 5, you can use the await operator on the result of the Task.Delay() method to introduce an asynchronous delay without blocking the current thread. See the “Timers” section in Chapter 19 for further detail.


Guidelines

AVOID calling Thread.Sleep() in production code.


Do Not Abort Threads in Production Code

The Thread object has an Abort() method that, when executed, attempts to destroy the thread. It does so by causing the runtime to throw a ThreadAbortException in the thread; this exception can be caught, but even if it is caught and ignored, it is automatically rethrown to try toensure that the thread is, in fact, destroyed. There are many reasons why it is a very bad idea to attempt to abort a thread. Here are some of them:

• The method promises only to try to abort the thread; there is no guarantee that it will succeed. For example, the runtime will not attempt to cause a ThreadAbortException if the point of control of the thread is currently inside a finally block (because critical cleanup code could be running right now and should not be interrupted) or is in unmanaged code (because doing so could corrupt the CLR itself). Rather, the CLR defers throwing the exception until control leaves the finally block or returns to managed code. But there is no guarantee that this ever happens. The thread being aborted might contain an infinite loop inside a finally block. (Ironically, the fact that the thread has an infinite loop might be the reason you are attempting to abort it in the first place.)

• The aborted thread might be in critical code protected by a lock statement. (See Chapter 19 for details.) Unlike a finally block, a lock will not prevent the exception. The critical code will be interrupted halfway through by the exception, and the lock object will be automatically released, allowing other code that is waiting on the lock object to enter the critical section and observe the state of the halfway-executed code. The whole point of locking is to prevent that scenario, so aborting a thread can transform what looks like thread-safe code into dangerously incorrect code.

• The CLR guarantees that its internal data structures will never be corrupted if a thread is aborted, but the BCL does not make this guarantee. Aborting a thread can leave any of your data structures or the BCL’s data structures in an arbitrarily bad state if the exception is thrown at the wrong time. Code running on other threads, or in the finally blocks of the aborted thread, can see this corrupted state and crash or behave badly.

In short, you should never abort a thread unless you are doing so as a last resort; ideally you should abort a thread only as part of a larger emergency shutdown whereby the entire AppDomain or the entire process is being destroyed. Fortunately, task-based asynchrony uses a more robust and safe cooperative cancellation pattern to terminate a “thread” whose results are no longer needed, as discussed in the next major section, “Asynchronous Tasks.”


Guidelines

AVOID aborting a thread in production code; doing so will yield unpredictable results and can destabilize a program.


Thread Pooling

As we discussed earlier, in the Beginner Topic titled “Performance Considerations,” it is possible for an excess of threads to negatively impact performance. Threads are expensive resources, thread context switching is not free, and running two jobs in simulated parallelism via time slicing can be hugely slower than running them one after the other.

To mitigate these problems, the BCL provides a thread pool. Instead of allocating threads directly, you can tell the thread pool which work you want to perform. When the work is finished, rather than the thread terminating and being destroyed, it is returned to the pool, saving on the cost of allocating a new thread when more work comes along. Listing 18.2 shows how to do the same thing as Listing 18.1, but this time with a pooled thread.

LISTING 18.2: Using ThreadPool Instead of Instantiating Threads Explicitly


using System;
using System.Threading;

public class Program
{
public const int Repetitions = 1000;
public static void Main()
{
ThreadPool.QueueUserWorkItem(DoWork, '+');

for(int count = 0; count < Repetitions; count++)
{
Console.Write('-');
}

// Pause until the thread completes.
// This is for illustrative purposes; do not
// use Thread.Sleep for synchronization in
// production code.
Thread.Sleep(1000);
}
public static void DoWork(object state)
{
for(int count = 0; count < Repetitions; count++)
{
Console.Write(state);
}
}
}


The output of Listing 18.2 is similar to Output 18.1—that is, an intermingling of periods and hyphens. If we had a lot of different jobs to perform asynchronously, this pooling technique would provide more efficient execution on single-processor and multiprocessor computers. The efficiency is achieved by reusing threads over and over, rather than reconstructing them for every asynchronous call. Unfortunately, thread pool use is not without its pitfalls: There are still performance and synchronization problems to consider when using a thread pool.

To make efficient use of processors, the thread pool assumes that all the work you schedule on the thread pool will finish in a timely manner so that the thread can be returned to the thread pool and reused by another task. The thread pool also assumes that all the work will be of a relatively short duration (that is, consuming milliseconds or seconds of processor time, not hours or days). By making this assumption, it can ensure that each processor is working full out on a task, and not inefficiently time-slicing multiple tasks, as described in the Beginner Topic on performance. The thread pool attempts to prevent excessive time slicing by ensuring that thread creation is “throttled” so that no one processor is “oversubscribed” with too many threads. Of course, that then means that consuming all threads within the pool can delay execution of queued-up work. If all the threads in the pool are consumed by long-running or I/O bound work, the queued-up work will be delayed.

Unlike Thread and Task, which are objects that you can manipulate directly, the thread pool does not provide a reference to the thread used to execute a given piece of work. This prevents the calling thread from synchronizing with, or controlling, the worker thread via the thread management functions described earlier in the chapter. In Listing 18.2 we use the “poor man’s synchronization” that we earlier discouraged; this would be a bad idea in production code because we do not actually know how long the work will take to complete.

In short, the thread pool does its job well, but that job does not include providing services to deal with long-running jobs or jobs that need to be synchronized with the main thread or with one another. What we really need to do is build a higher-level abstraction that can use threads and thread pools as an implementation detail; that abstraction is implemented by the Task Parallel Library, which is the topic of most of the rest of this chapter.

For more details on other techniques for managing worker threads that were commonly used prior to .NET 4, see the Essential C# 3.0 multithreading chapters at IntelliTect.com/EssentialCSharp.


Guidelines

DO use the thread pool to efficiently assign processor time to processor-bound tasks.

AVOID allocating a pooled worker thread to a task that is I/O bound or long-running; use TPL instead.


Begin 4.0

Asynchronous Tasks

Multithreaded programming includes the following complexities:

1. Monitoring an asynchronous operation state for completion: This includes determining when an asynchronous operation has completed, preferably not by polling the thread’s state or by blocking and waiting.

2. Thread pooling: This avoids the significant cost of starting and tearing down threads. In addition, thread pooling avoids the creation of too many threads, such that the system spends more time switching threads than running them.

3. Avoiding deadlocks: This involves preventing the occurrence of deadlocks while attempting to protect the data from simultaneous access by two different threads.

4. Providing atomicity across operations and synchronizing data access: Adding synchronization around groups of operations ensures that operations execute as a single unit and that they are appropriately interrupted by another thread. Locking is provided so that two different threads do not access the data simultaneously.

Furthermore, anytime a method is long-running, multithreaded programming will probably be required—that is, invoking the long-running method asynchronously. As developers write more multithreaded code, a common set of scenarios and programming patterns for handling those scenarios emerges.

C# 5.0 enhanced the programmability of one such pattern—TAP—by leveraging the TPL from .NET 4.0 and enhancing the C# language with new constructs to support it. This and the following section delve into the details of the TPL on its own and then the TPL with the async/awaitcontextual keywords that simplify TAP programming. In the second half of Chapter 19, we consider several additional multithreading patterns that are important to be familiar with if the TPL and C# 5.0 are not available or you are programming against a non–TPL-based API.

From Thread to Task

Creating a thread is a relatively expensive operation, and each thread consumes a large amount (1 megabyte, by default) of virtual memory. We saw earlier in this chapter that it is potentially more efficient to use a thread pool to allocate threads when needed, assign asynchronous work to the thread, run the work to completion, and then reuse the thread for subsequent asynchronous work, rather than destroying the thread when the work is complete and creating a new one later.

In .NET Framework 4, instead of creating an operating system thread each time asynchronous work is started, the TPL creates a Task and tells the task scheduler that there is asynchronous work to perform. A task scheduler might use many different strategies to fulfill this purpose, but by default it requests a worker thread from the thread pool. The thread pool, as we’ve seen already, might decide that it is more efficient to run the task later, after some currently executing tasks have completed, or might decide to schedule the task’s worker thread to a particular processor. The thread pool determines whether it is more efficient to create an entirely new thread or to reuse an existing thread that previously finished executing.

By abstracting the concept of asynchronous work into the Task object, the TPL provides an object that represents asynchronous work and provides an object-oriented API for interacting with that work. Moreover, by providing an object that represents the unit of work, the TPL enables programmatically building up workflows by composing small tasks into larger ones, as we’ll see.

A task is an object that encapsulates work that executes asynchronously. This should sound familiar: A delegate is also an object that represents code. The difference between a task and a delegate is that delegates are synchronous and tasks are asynchronous. Executing a delegate, say, anAction, immediately transfers the point of control of the current thread to the delegate’s code; control does not return to the caller until the delegate is finished. By contrast, starting a task almost immediately returns control to the caller, no matter how much work the task has to perform. The task executes asynchronously, typically on another thread (though, as we will see later in this chapter, it is possible and even beneficial to execute tasks asynchronously with only one thread). A task essentially transforms a delegate from a synchronous to an asynchronous execution pattern.

Introducing Asynchronous Tasks

You know when a delegate is done executing on the current thread because the caller cannot do anything until the delegate is done. But how do you know when a task is done, and how do you get the result, if there is one? Consider the example of turning a synchronous delegate into an asynchronous task. We’ll do the same thing we did with threads in Listing 18.1 and thread pools in Listing 18.2, but this time with tasks: The worker thread will write periods to the console, while the main thread writes hyphens.

Starting the task obtains a new thread from the thread pool, creating a second “point of control,” and executes the delegate on that thread. The point of control on the main thread continues normally after the call to start the task (Task.Run()). The results of Listing 18.3 are almost identical to Output 18.1.

LISTING 18.3: Invoking an Asynchronous Task


using System;
using System.Threading.Tasks;

public class Program
{
public static void Main()
{
const int Repetitions = 10000;
// Use Task.Factory.StartNew<string>() for
// TPL prior to .NET 4.5
Task task = Task.Run(() =>
{
for(int count = 0;
count < Repetitions; count++)
{
Console.Write('-');
}
});
for(int count = 0; count < Repetitions; count++)
{
Console.Write('+');
}

// Wait until the Task completes
task.Wait();
}
}


The code that is to run in a new thread is defined in the delegate (of type Action in this case) passed to the Task.Run() method. This delegate (in the form of a lambda expression) prints out dashes to the console repeatedly. The loop that follows the starting of the task is almost identical, except that it displays plus signs.

Notice that following the call to Task.Run() the Action passed as the argument immediately starts executing. The Task is said to be “hot,” meaning that it has already been triggered to start executing—as opposed to a “cold” task, which needs to be explicitly started before the asynchronous work begins.

Although a Task can also be instantiated in a “cold” state via the Task constructor, doing so is generally appropriate only as an implementation detail internal to an API that returns an already running (“hot”) Task, one triggered by a call to Task.Start().

Notice that the exact state of a “hot” task is indeterminate immediately following the call to Run(). The state is instead determined by the operating system and whether it chooses to run the task’s worker thread immediately or delay it until additional resources are available. In fact, it is possible that the hot task is already finished by the time the code on the calling thread gets its turn to execute again. The call to Wait() forces the main thread to wait until all the work assigned to the task has completed executing. This is analogous to calling Join() on the worker thread, as we did in Listing 18.1.

In this scenario we have a single task, but it is also possible for many tasks to be running asynchronously. It is common to have a set of tasks where you want to wait for all of them to complete, or for any one of them to complete, before continuing execution of the current thread. TheTask.WaitAll() and Task.WaitAny() methods do so.

So far, we’ve seen how a task can take an Action and run it asynchronously. But what if the work executed in the task returns a result? We can use the Task<T> type to run a Func<T> asynchronously. When executing a delegate synchronously, we know that control will not return until the result is available. When executing a Task<T> asynchronously, we can poll it from one thread to see if it is done, and fetch the result when it is.3 Listing 18.4 demonstrates how to do so in a console application. Note that this sample uses a PiCalculator.Calculate() method that we will delve into further in the section “Executing Loop Iterations in Parallel.”

3. Exercise caution when using this polling technique. When creating a task from a delegate, as we have here, the task will be scheduled to run on a worker thread from the thread pool. As a consequence, the current thread will loop until the work is complete on the worker thread. This technique works, but it might consume CPU resources unnecessarily. Such a polling technique is dangerously broken if, instead of scheduling the task to run on a worker thread, you schedule the task to execute in the future on the current thread. Since the current thread is in a loop polling the task, it will loop forever because the task will not complete until the current thread exits the loop.

LISTING 18.4: Polling a Task<T>


using System;
using System.Threading.Tasks;

public class Program
{
public static void Main()
{
// Use Task.Factory.StartNew<string>() for
// TPL prior to .NET 4.5
Task<string> task =
Task.Run<string>(
() => PiCalculator.Calculate(100));

foreach(
char busySymbol in Utility.BusySymbols())
{
if(task.IsCompleted)
{
Console.Write('\b');
break;
}
Console.Write(busySymbol);
}

Console.WriteLine();

Console.WriteLine(task.Result);
System.Diagnostics.Trace.Assert(
task.IsCompleted);
}
}


public class PiCalculator
{
public static string Calculate(int digits = 100)
{
// ...
}
}


public class Utility
{
public static IEnumerable<char> BusySymbols()
{
string busySymbols = @"-\|/-\|/";
int next = 0;
while(true)
{
yield return busySymbols[next];
next = (next + 1) % busySymbols.Length;
yield return '\b';
}
}
}


This listing shows that the data type of the task is Task<string>. The generic type includes a Result property from which to retrieve the value returned by the Func<string> that the Task<string> executes.

Note that Listing 18.4 does not make a call to Wait(). Instead, reading from the Result property automatically causes the current thread to block until the result is available, if it isn’t already; in this case we know that it will already be complete when the result is fetched.

In addition to the IsCompleted and Result properties on Task<T>, there are several others worth noting:

• The IsCompleted property is set to true when a task completes, whether it completed normally or faulted (that is, ended because it threw an exception). More detailed information on the status of a task can be obtained by reading the Status property, which returns a value of type TaskStatus. Possible values are Created, WaitingForActivation, WaitingToRun, Running, WaitingForChildrenToComplete, RanToCompletion, Canceled, and Faulted. IsCompleted is true whenever the Status isRanToCompletion, Canceled, or Faulted. Of course, if the task is running on another thread and you read the status as “Running,” the status could change to “Completed” at any time, including immediately after you read the value of the property. The same is true of many other states—even Created could potentially change if a different thread starts it. Only RanToCompletion, Canceled, and Faulted can be considered final states that can no longer be transitioned.

• A task can be uniquely identified by the value of the Id property. The static Task.CurrentId property provides the identifier for the currently executing Task (that is, the task that is executing the Task.CurrentId call). These properties are especially useful when debugging.

• You can use the AsyncState to associate additional data with a task. For example, imagine a List<T> whose values will be computed by various tasks. Each task could contain the index of the value in the AsyncState property. This way, when the task completes, the code can index into the list using the AsyncState (first casting it to an int).4

4. Be careful when using tasks to asynchronously mutate collections. The tasks might be running on worker threads, and the collection might not be thread safe. It is safer to fill in the collection from the main thread after the tasks are completed.

There are other useful properties that we will discuss later in this chapter, in the section on task cancellation.

Task Continuation

We’ve talked several times about the “control flow” of a program without ever saying what the most fundamental nature of control flow is: Control flow determines what happens next. When you have a simple control flow like Console.WriteLine(x.ToString());, the control flow tells you that when ToString completes normally, the next thing that will happen is a call to WriteLine with the value returned as the argument. The concept of “what happens next” is called continuation; each point in a control flow has a continuation. In our example, the continuation of ToString is WriteLine (and the continuation of WriteLine is whatever code runs in the next statement). The idea of continuation is so elementary to C# programming that most programmers don’t even think about it; it’s part of the invisible air that they breathe. The act of C# programming is the act of constructing continuation upon continuation until the control flow of the entire program is complete.

Notice that the continuation of a given piece of code in a normal C# program will be executed immediately upon the completion of that code. When ToString() returns, the point of control on the current thread immediately does a synchronous call to WriteLine. Notice also that there are actually two possible continuations of a given piece of code: the “normal” continuation and the “exceptional” continuation that will be executed if the current piece of code throws an exception.

Asynchronous method calls, such as starting a Task, add an additional dimension to the control flow. With an asynchronous Task invocation, the control flow goes immediately to the statement after the Task.Start() while at the same time, it begins executing within the body of theTask delegate. In other words, “what happens next” when asynchrony is involved is multidimensional. Unlike with exceptions where the continuation is just a different path, with asynchrony continuation is an additional, parallel path.

Asynchronous tasks also allow composition of larger tasks out of smaller tasks by describing asynchronous continuations. Just as with regular control flow, a task can have different continuations to handle error situations, and tasks can be melded together by manipulating their continuations. There are several techniques for doing so, the most explicit of which is the ContinueWith() method (see Listing 18.5 and its corresponding output, Output 18.2).

LISTING 18.5: Calling Task.ContinueWith()


using System;
using System.Threading.Tasks;

public class Program
{
public static void Main()
{
Console.WriteLine("Before");
// Use Task.Factory.StartNew<string>() for
// TPL prior to .NET 4.5
Task taskA =
Task.Run( () =>
Console.WriteLine("Starting..."))
.ContinueWith(antecedent =>
Console.WriteLine("Continuing A..."));
Task taskB = taskA.ContinueWith( antecedent =>
Console.WriteLine("Continuing B..."));
Task taskC = taskA.ContinueWith( antecedent =>
Console.WriteLine("Continuing C..."));
Task.WaitAll(taskB, taskC);
Console.WriteLine("Finished!");
}
}


OUTPUT 18.2

Before
Starting...
Continuing A...
Continuing C...
Continuing B...
Finished!

The ContinueWith() method enables “chaining” two tasks together, such that when the predecessor task—the antecedent task—completes, the second task—the continuation task—is automatically started asynchronously. In Listing 18.5, for example,Console.WriteLine("Starting...") is the antecedent task body and Console.WriteLine("Continuing A...") is its continuation task body. The continuation task takes a Task as its argument (antecedent), thereby allowing the continuation task’s code to access the antecedent task’s completion state. When the antecedent task is completed, the continuation task starts automatically, asynchronously executing the second delegate, and passing the just-completed antecedent task as an argument to that delegate. Furthermore, since the ContinueWith()method returns a Task as well, that Task can be used as the antecedent of yet another Task, and so on, forming a continuation chain of Tasks that can be arbitrarily long.

If you call ContinueWith() twice on the same antecedent task (as Listing 18.5 shows with taskB and taskC representing continuation tasks for taskA), the antecedent task (taskA) has two continuation tasks and when the antecedent task completes, both continuation tasks will be executed asynchronously. Notice that the order of execution of the continuation tasks from a single antecedent is indeterminate at compile time. Output 18.2 happens to show taskC executing before taskB, but in a second execution of the program, the order might be reversed. However,taskA will always execute before taskB and taskC because the latter are continuation tasks of taskA and, therefore, can’t start before taskA completes. Similarly, the Console.WriteLine("Starting...") delegate will always execute to completion before taskA(Console.WriteLine("Continuing A...")) because the latter is a continuation task of the former. Furthermore, “Finished!” will always appear last because of the call to Task.WaitAll(taskB, taskC) that blocks the control flow from continuing until both taskB andtaskC complete.

Many different overloads of ContinueWith() are possible, and some of them take a TaskContinuationOptions value to tweak the behavior of the continuation chain. These values are flags, so they can be combined using the logical OR operator (|). A brief description of some of the possible flag values appears in Table 18.1; see the online MSDN documentation5 for more details.

5. MSDN.NET Framework Developer Center, http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskcontinuationoptions(v=vs.110).aspx.

Image

Image

Image

TABLE 18.1: List of Available TaskContinuationOptions Enums

The items denoted with a star (*) indicate under which conditions the continuation task will be executed; thus they are particularly useful for creating continuations that act like event handlers for the antecedent task’s behavior. Listing 18.6 demonstrates how an antecedent task can be given multiple continuations that execute conditionally, depending on how the antecedent task completed.

LISTING 18.6: Registering for Notifications of Task Behavior with ContinueWith()


using System;
using System.Threading.Tasks;
using System.Diagnostics;
using AddisonWesley.Michaelis.EssentialCSharp.Shared;

public class Program
{
public static void Main()
{
// Use Task.Factory.StartNew<string>() for
// TPL prior to .NET 4.5
Task<string> task =
Task.Run<string>(
() => PiCalculator.Calculate(10));

Task faultedTask = task.ContinueWith(
(antecedentTask) =>
{
Trace.Assert(antecedentTask.IsFaulted);
Console.WriteLine(
"Task State: Faulted");
},
TaskContinuationOptions.OnlyOnFaulted);

Task canceledTask = task.ContinueWith(
(antecedentTask) =>
{
Trace.Assert(antecedentTask.IsCanceled);
Console.WriteLine(
"Task State: Canceled");
},
TaskContinuationOptions.OnlyOnCanceled);

Task completedTask = task.ContinueWith(
(antecedentTask) =>
{
Trace.Assert(antecedentTask.IsCompleted);
Console.WriteLine(
"Task State: Completed");
}, TaskContinuationOptions.
OnlyOnRanToCompletion);

completedTask.Wait();
}
}


In this listing, we effectively register “listeners” for “events” on the antecedent’s task so that when the task completes normally or abnormally, the particular “listening” task will begin executing. This is a powerful capability, particularly if the original task is a “fire and forget” task—that is, a task that we start, hook up to continuation tasks, and then never refer to again.

In Listing 18.6, notice that the final Wait() call is on completedTask, not on task—the original antecedent task created with Task.Run(). Although each delegate’s antecedentTask is a reference to the parent (antecedent) task (task), from outside the delegate listeners we can effectively discard the reference to the original task. We can then rely solely on the continuation tasks that begin executing asynchronously without any need for follow-up code that checks the status of the original task.

In this case, we call completedTask.Wait() so that the main thread does not exit the program before the completed output appears (see Output 18.3).

OUTPUT 18.3

Task State: Completed.

In this case, invoking completedTask.Wait() is somewhat contrived because we know that the original task will complete successfully. However, invoking Wait() on canceledTask or faultedTask will result in an exception. Those continuation tasks run only if the antecedent task is canceled or throws an exception; given that will not happen in this program, those tasks will never be scheduled to run, and waiting for them to complete would throw an exception. The continuation options in Listing 18.3 happen to be mutually exclusive, so when the antecedent task runs to completion and the task associated with completedTask executes, the task scheduler automatically cancels the tasks associated with canceledTask and faultedTask. The canceled tasks end with their state set to Canceled. Therefore, calling Wait() (or any other invocation that would cause the current thread to wait for a task completion) on either of these tasks will throw an exception indicating that they are canceled.

A less contrived approach might be to call Task.WaitAny(completedTask, canceledTask, faultedTask), which will throw an AggregateException that then needs to be handled.

Unhandled Exception Handling on Task with AggregateException

When calling a method synchronously, we can wrap it in a try block with a catch clause to identify to the compiler which code we want to execute when an exception occurs. This does not work with an asynchronous call, however. We cannot simply wrap a try block around a call toStart() to catch an exception, because control immediately returns from the call, and control will then leave the try block, possibly long before the exception occurs on the worker thread. One solution is to wrap the body of the task delegate with a try/catch block. Exceptions thrown on and subsequently caught by the worker thread will consequently not present problems, as a try block will work normally on the worker thread. This is not the case, however, for unhandled exceptions—those that the worker thread does not catch.

Generally (starting with version 2.06 of the CLR), unhandled exceptions on any thread are treated as fatal, trigger the Windows Error Reporting dialog, and cause the application to terminate abnormally. All exceptions on all threads must be caught, and if they are not, the application is not allowed to continue to run. (For some advanced techniques for dealing with unhandled exceptions, see the upcoming Advanced Topic titled “Dealing with Unhandled Exceptions on a Thread.”) Fortunately, this is not the case, however, for unhandled exceptions in an asynchronously running task. Rather, the task scheduler inserts a “catchall” exception handler around the delegate so that if the task throws an otherwise unhandled exception, the catchall handler will catch it and record the details of the exception in the task, avoiding any trigger of the CLR automatically terminating the process.

6. In version 1.0 of the CLR, an unhandled exception on a worker thread terminated the thread but not the application. As a result, it was possible for a buggy program to have all its worker threads die, but the main thread would continue to run, even though the program was no longer doing any work. This is a confusing situation for users to be in; it is better to signal to the user that the application is in a bad state and terminate it before it can do any more harm.

As we saw in Listing 18.6, one technique for dealing with a faulted task is to explicitly create a continuation task that is the “fault handler” for that task; the task scheduler will automatically schedule the continuation when it detects that the antecedent task threw an unhandled exception. If no such handler is present, however, and Wait() (or an attempt to get the Result) executes on a faulted task, an AggregateException will be thrown (see Listing 18.7 and Output 18.4).

LISTING 18.7: Handling a Task’s Unhandled Exception


using System;
using System.Threading.Tasks;

public class Program
{
public static void Main()
{
// Use Task.Factory.StartNew<string>() for
// TPL prior to .NET 4.5
Task task = Task.Run(() =>
{
throw new InvalidOperationException();
});

try
{
task.Wait();
}
catch(AggregateException exception)
{
exception.Handle(eachException =>
{
Console.WriteLine(
$"ERROR: { eachException.Message }");
return true;
});
}
}
}


OUTPUT 18.4

ERROR: Operation is not valid due to the current state of the object.

The aggregate exception is so-called because it may contain many exceptions collected from one or more faulted tasks. Imagine, for example, asynchronously executing ten tasks in parallel and five of them throwing exceptions. To report all five exceptions and have them handled in a single catch block, the framework uses the AggregateException as a means of collecting the exceptions and reporting them as a single exception. Furthermore, since it is unknown at compile time whether a worker task will throw one or more exceptions, an unhandled faulted task will always throw an AggregateException. Listing 18.7 and Output 18.4 demonstrate this behavior. Even though the unhandled exception thrown on the worker thread was of type InvalidOperationException, the type of the exception caught on the main thread is still anAggregateException. Also, as expected, to catch the exception requires an AggregateException catch block.

A list of the exceptions contained within an AggregateException is available from the InnerExceptions property. As a result, you can iterate over this property to examine each exception and determine the appropriate course of action. Alternatively, and as shown in Listing 18.7, you can use the AggregateException.Handle() method, specifying an expression to execute against each individual exception contained within the AggregateException. One important characteristic of the Handle() method to consider, however, is that it is a predicate. As such, the predicate should return true for any exceptions that the Handle() delegate successfully addresses. If any exception handling invocation returns false for an exception, the Handle() method will throw a new AggregateException that contains the composite list of such corresponding exceptions.

You can also observe the state of a faulted task without causing the exception to be rethrown on the current thread by simply looking at the Exception property of the task. Listing 18.8 demonstrates this approach by waiting for the completion of a fault continuation of a task7 that we know will throw an exception.

7. As we discussed earlier, waiting for a fault continuation to complete is a strange thing to do because most of the time it will never be scheduled to run in the first place. This code is provided for illustrative purposes only.

LISTING 18.8: Observing Unhandled Exceptions on a Task Using ContinueWith()


using System;
using System.Diagnostics;
using System.Threading.Tasks;

public class Program
{
public static void Main()
{
bool parentTaskFaulted = false;

Task task = new Task(() =>
{
throw new InvalidOperationException();
});
Task continuationTask = task.ContinueWith(
(antecedentTask) =>
{
parentTaskFaulted =
antecedentTask.IsFaulted;
}, TaskContinuationOptions.OnlyOnFaulted);
task.Start();
continuationTask.Wait();
Trace.Assert(parentTaskFaulted);
Trace.Assert(task.IsFaulted);
task.Exception.Handle(eachException =>
{
Console.WriteLine(
$"ERROR: { eachException.Message }");
return true;
});
}
}


Notice that to retrieve the unhandled exception on the original task, we use the Exception property. The result is output identical to Output 18.4.

If an exception that occurs within a task goes entirely unobserved—that is, (1) it isn’t caught from within the task; (2) the completion of the task is never observed, via Wait(), Result, or accessing the Exception property, for example; and (3) the faulted ContinueWith() is never observed—then the exception is likely to go unhandled entirely, resulting in a process-wide unhandled exception. In .NET 4.0, such a faulted task would get rethrown by the finalizer thread and likely crash the process. In contrast, in .NET 4.5, the crashing has been suppressed (although the CLR can be configured for the crashing behavior if preferred).

In either case, you can register for an unhandled task exception via the TaskScheduler.UnobservedTaskException event.


Advanced Topic: Dealing with Unhandled Exceptions on a Thread

As we discussed earlier, an unhandled exception on any thread by default causes the application to shut down. An unhandled exception is a fatal, unexpected bug, and the exception may have occurred because a crucial data structure is corrupt. You therefore have no idea what the program could possibly be doing, so the safest thing to do is to shut down the whole thing immediately.

Ideally, no programs would ever throw unhandled exceptions on any thread; programs that do so have bugs, and the best course of action is to find and fix the bug before the software is shipped to customers. However, rather than shutting down an application as soon as possible when an unhandled exception occurs, it is often desirable to save any working data and/or log the exception for error reporting and future debugging. This requires a mechanism to register notifications of unhandled exceptions.

Every AppDomain provides such a mechanism, and to observe the unhandled exceptions that occur in an AppDomain, you must add a handler to the UnhandledException event. The UnhandledException event will fire for all unhandled exceptions on threads within the application domain, whether it is the main thread or a worker thread. Note that the purpose of this mechanism is notification; it does not permit the application to recover from the unhandled exception and continue executing. After the event handlers run, the application will display the Window Error Reporting dialog and then the application will exit. (For console applications, the exception details will also appear on the console.)

In Listing 18.9, we show how to create a second thread that throws an exception, which is then handled by the application domain’s unhandled exception event handler. For demonstration purposes, to ensure that thread timing issues do not come into play, we insert some artificial delays using Thread.Sleep. Output 18.5 shows the results.

LISTING 18.9: Registering for Unhandled Exceptions


using System;
using System.Diagnostics;
using System.Threading;

public class Program
{
public static Stopwatch clock = new Stopwatch();
public static void Main()
{
try
{
clock.Start();
// Register a callback to receive notifications
// of any unhandled exception.
AppDomain.CurrentDomain.UnhandledException +=
(s, e) =>
{
Message("Event handler starting");
Delay(4000);
};

Thread thread = new Thread(() =>
{
Message("Throwing exception.");
throw new Exception();
});
thread.Start();

Delay(2000);
}
finally
{
Message("Finally block running.");
}
}

static void Delay(int i)
{
Message($"Sleeping for {i} ms");
Thread.Sleep(i);
Message("Awake");
}

static void Message(string text)
{
Console.WriteLine("{0}:{1:0000}:{2}",
Thread.CurrentThread.ManagedThreadId,
clock.ElapsedMilliseconds, text);
}
}


OUTPUT 18.5

3:0047:Throwing exception.
3:0052:Unhandled exception handler starting.
3:0055:Sleeping for 4000 ms
1:0058:Sleeping for 2000 ms
1:2059:Awake
1:2060:Finally block running.
3:4059:Awake
Unhandled Exception: System.Exception: Exception of type 'System.
Exception' was thrown.

As you can see in Output 18.5, the new thread is assigned thread ID 3 and the main thread is assigned thread ID 1. The operating system schedules thread 3 to run for a while; it throws an unhandled exception, the event handler is invoked, and it goes to sleep. Soon thereafter, the operating system realizes that thread 1 can be scheduled, but its code immediately puts it to sleep. Thread 1 wakes up first and runs the finally block, and then 2 seconds later thread 3 wakes up, and the unhandled exception finally crashes the process.

This sequence of events—the event handler executing, and the process crashing after it is finished—is typical, but not guaranteed. The moment there is an unhandled exception in your program, all bets are off; the program is now in an unknown and potentially very unstable state, so its behavior can be unpredictable. In this case, as you can see, the CLR allows the main thread to continue running and executes its finally block, even though it knows by the time that control gets to the finally block, another thread is in the AppDomain’s unhanded exception event handler.

To emphasize this fact, try changing the delays so that the main thread sleeps longer than the event handler. In that scenario, the finally block will never execute! The process will be destroyed by the unhandled exception before thread 1 wakes up. You can also get different results depending on whether the exception-throwing thread is or is not created by the thread pool. The best practice, therefore, is to avoid all possible unhandled exceptions, whether they occur in worker threads or in the main thread.

How does this pertain to tasks? What if there are unfinished tasks hanging around the system when you want to shut it down? We’ll look at task cancellation in the next section.


Guidelines

AVOID writing programs that produce unhandled exceptions on any thread.

CONSIDER registering an unhandled exception event handler for debugging, logging, and emergency shutdown purposes.

DO cancel unfinished tasks, rather than allowing them to run during application shutdown.



Canceling a Task

Earlier in this chapter, we described why it’s a bad idea to rudely abort a thread so as to cancel a task being performed by that thread. The TPL uses cooperative cancellation, a far more polite, robust, and reliable technique for safely canceling a task that is no longer needed. A task that supports cancellation monitors a CancellationToken object (found in the System.Threading namespace) by periodically polling it to see if a cancellation request has been issued. Listing 18.10 demonstrates both the cancellation request and the response to the request. Output 18.6shows the results.

LISTING 18.10: Canceling a Task Using CancellationToken


using System;
using System.Threading;
using System.Threading.Tasks;
using AddisonWesley.Michaelis.EssentialCSharp.Shared;

public class Program
{
public static void Main()
{
string stars =
"*".PadRight(Console.WindowWidth-1, '*');
Console.WriteLine("Push ENTER to exit.");

CancellationTokenSource cancellationTokenSource=
new CancellationTokenSource();
// Use Task.Factory.StartNew<string>() for
// TPL prior to .NET 4.5
Task task = Task.Run(
() =>
WritePi(cancellationTokenSource.Token),
cancellationTokenSource.Token);

// Wait for the user's input
Console.ReadLine();

cancellationTokenSource.Cancel();
Console.WriteLine(stars);
task.Wait();
Console.WriteLine();
}

private static void WritePi(
CancellationToken cancellationToken)
{
const int batchSize = 1;
string piSection = string.Empty;
int i = 0;

while(!cancellationToken.IsCancellationRequested
|| i == int.MaxValue)
{
piSection = PiCalculator.Calculate(
batchSize, (i++) * batchSize);
Console.Write(piSection);
}
}
}


OUTPUT 18.6

Push ENTER to exit.
3.141592653589793238462643383279502884197169399375105820974944592307816
40628620899862803482534211706798214808651328230664709384460955058223172
5359408128481117450
***********************************************************************
2

After starting the task, a Console.Read() blocks the main thread. At the same time, the task continues to execute, calculating the next digit of pi and printing it out. Once the user presses Enter, the execution encounters a call to CancellationTokenSource.Cancel(). InListing 18.10, we split the call to task.Cancel() from the call to task.Wait() and print out a line of asterisks in between. The purpose of this step is to show that quite possibly an additional iteration will occur before the cancellation token is observed—hence the additional 2 inOutput 18.6 following the stars. The 2 appears because the CancellationTokenSource.Cancel() doesn’t rudely stop the task from executing. The task keeps on running until it checks the token, and politely shuts down when it sees that the owner of the token is requesting cancellation of the task.

The Cancel() call effectively sets the IsCancellationRequested property on all cancellation tokens copied from CancellationTokenSource.Token. There are a few things to note, however:

• A CancellationToken, not a CancellationTokenSource, is given to the asynchronous task. A CancellationToken enables polling for a cancellation request; the CancellationTokenSource provides the token and signals it when it is canceled (see Figure 18.2). By passing the CancellationToken rather than the CancellationTokenSource, we don’t have to worry about thread synchronization issues on the CancellationTokenSource because the latter remains accessible to only the original thread.

Image

FIGURE 18.2: CancellationTokenSource and CancellationToken Class Diagrams

• A CancellationToken is a struct, so it is copied by value. The value returned by CancellationTokenSource.Token produces a copy of the token. For this reason CancellationToken is thread safe—it is available only from within the WritePi() method.

To monitor the IsCancellationRequested property, a copy of the CancellationToken (retrieved from CancellationTokenSource.Token) is passed to the task. In Listing 18.9, we then occasionally check the IsCancellationRequested property on theCancellationToken parameter; in this case, we check after each digit calculation. If IsCancellationRequested returns true, the while loop exits. Unlike a thread abort, which would throw an exception at essentially a random point, we exit the loop using normal control flow. We guarantee that the code is responsive to cancellation requests by polling frequently.

One other point to note about the CancellationToken is the overloaded Register() method. Via this method, you can register an action that will be invoked whenever the token is canceled. In other words, calling the Register() method subscribes to a listener delegate on the corresponding CancellationTokenSource’s Cancel().

Given that canceling before completing is the expected behavior in this program, the code in Listing 18.9 does not throw a System.Threading.Tasks.TaskCanceledException. As a consequence, task.Status will return TaskStatus.RanToCompletion—providing no indication that the work of the task was, in fact, canceled. In this example, there is no need for such an indication; however, the TPL does include the capability to do this. If the cancel call were disruptive in some way—preventing a valid result from returning, for example—throwing aTaskCanceledException (which derives from System.OperationCanceledException) would be the TPL pattern for reporting it. Instead of throwing the exception explicitly, CancellationToken includes a ThrowIfCancellationRequested() method to report the exception more easily, assuming an instance of CancellationToken is available.

If you attempt to call Wait() (or obtain the Result) on a task that threw TaskCanceledException, the behavior is the same as if any other exception had been thrown in the task: The call will throw an AggregateException. The exception is a means of communicating that the state of execution following the task is potentially incomplete. Unlike a successfully completed task in which all expected work executed successfully, a canceled task potentially has partially completed work—the state of the work is untrusted.

This example demonstrates how a long-running processor-bound operation (calculating pi almost indefinitely) can monitor for a cancellation request and respond if one occurs. There are some cases, however, when cancellation can occur without explicitly coding for it within the target task. For example, the Parallel class discussed later in the chapter offers such a behavior by default.

Begin 5.0

Task.Run(): A Shortcut and Simplification to Task.Factory.StartNew()

In .NET 4.0, the general practice for obtaining a task was to call Task.Factory.StartNew(). In .NET 4.5, a simpler calling structure was provided in Task.Run(). Like Task.Run(), Task.Factory.StartNew() could be used in C# 4.0 scenarios to invoke CPU-intensive methods that require an additional thread to be created.

Given .NET 4.5, Task.Run() should be used by default unless it proves insufficient. For example, if you need to control the task with TaskCreationOptions, if you need to specify an alternative scheduler, or if, for performance reasons, you want to pass in object state, you should consider using Task.Factory.StartNew(). Only in rare cases, where you need to separate creation from scheduling, should constructor instantiation followed by a call to Start() be considered.

Listing 18.11 provides an example of using Task.Factory.StartNew().

LISTING 18.11: Using Task.Factory.StartNew()


public Task<string> CalculatePiAsync(int digits)
{
return Task.Factory.StartNew<string>(
() => CalculatePi(digits));
}

private string CalculatePi(int digits)
{
// ...
}


End 5.0

Long-Running Tasks

As we discussed earlier in the commentary on Listing 18.2, the thread pool assumes that work items will be processor-bound and relatively short-lived; it makes these assumptions to effectively throttle the number of threads created. This prevents both overallocation of expensive thread resources and oversubscription of processors that would lead to excessive context switching and time slicing.

But what if the developer knows that a task will be long-running and, therefore, will hold on to an underlying thread resource for a long time? In this case, the developer can notify the scheduler that the task is unlikely to complete its work anytime soon. This has two effects. First, it hints to the scheduler that perhaps a dedicated thread ought to be created specifically for this task, rather than attempting to use a thread from the thread pool. Second, it hints to the scheduler that perhaps this would be a good time to allow more tasks to be scheduled than there are processors to handle them. This will cause more time slicing to happen, which is a good thing. We do not want one long-running task to hog an entire processor and prevent shorter-running tasks from using it. The short-running tasks will be able to use their time slice to finish a large percentage of their work, and the long-running task is unlikely to notice the relatively slight delays caused by sharing a processor with other tasks. To accomplish this, use the TaskCreationOptions.LongRunning option when calling StartNew(), as shown in Listing 18.12. (Task.Run() does not support a TaskCreationOptions parameter.)

LISTING 18.12: Cooperatively Executing Long-Running Tasks


using System.Threading.Tasks;

// ...

Task task = Task.Factory.StartNew(
() =>
WritePi(cancellationTokenSource.Token),
TaskCreationOptions.LongRunning);
// ...



Guidelines

DO inform the task factory that a newly created task is likely to be long-running so that it can manage it appropriately.

DO use TaskCreationOptions.LongRunning sparingly.


Tasks Are Disposable

Note that Task also supports IDisposable. This is necessary because Task may allocate a WaitHandle when waiting for it to complete; since WaitHandle supports IDisposable, Task also supports IDisposable in accordance with best practices. However, readers will note that the preceding code samples do not include a Dispose() call, nor do they rely on such a call implicitly via the using statement. The listings instead rely on an automatic WaitHandle finalizer invocation when the program exits.

This approach leads to two notable results. First, the handles live longer and hence consume more resources than they ought to. Second, the garbage collector is slightly less efficient because finalized objects survive into the next generation. However, both of these concerns are inconsequential in the Task case unless an extraordinarily large number of tasks are being finalized. Therefore, even though technically speaking all code should be disposing of tasks, you needn’t bother to do so unless performance metrics require it and it’s easy—that is, if you’re certain that Tasks have completed and no other code is using them.

The Task-Based Asynchronous Pattern

As we’ve seen so far, tasks provide a better abstraction for the manipulation of asynchronous work than threads do. Tasks are automatically scheduled to the right number of threads and large tasks can be composed by chaining together small tasks, just as large programs can be composed from multiple small methods.

However, there are some drawbacks to tasks. The principal difficulty with tasks is that they turn your program logic “inside out.” To illustrate this, we first consider a synchronous method that is blocked on an I/O-bound, high-latency operation—a web request. Next, we compare it to an asynchronous version prior to C# 5.0 and the Task-based Asynchronous Pattern (TAP). Lastly, we revise the same example by using C# 5.0 (and higher) and the async/await contextual keywords.

Synchronously Invoking a High-Latency Operation

In Listing 18.13, the code uses a WebRequest to download a web page and display its size. If the operation fails, an exception is thrown.

LISTING 18.13: A Synchronous Web Request


using System;
using System.IO;
using System.Net;
using System.Linq;

public class Program
{
public static void Main(string[] args)
{
string url = "http://www.IntelliTect.com";
if(args.Length > 0)
{
url = args[0];
}

try
{
Console.Write(url);
WebRequest webRequest =
WebRequest.Create(url);

WebResponse response =
webRequest.GetResponse();

Console.Write(".....");

using(StreamReader reader =
new StreamReader(
response.GetResponseStream()))
{
string text =
reader.ReadToEnd();
Console.WriteLine(
FormatBytes(text.Length));
}
}
catch(WebException)
{
// ...
}
catch(IOException )
{
// ...
}
catch(NotSupportedException )
{
// ...
}
}

static public string FormatBytes(long bytes)
{
string[] magnitudes =
new string[] { "GB", "MB", "KB", "Bytes" };
long max =
(long)Math.Pow(1024, magnitudes.Length);

return string.Format("{1:##.##} {0}",
magnitudes.FirstOrDefault(
magnitude =>
bytes > (max /= 1024)) ?? "0 Bytes",
(decimal)bytes / (decimal)max);
}
}


The logic in Listing 18.13 is relatively straightforward—using common C# idioms like try/catch blocks and return statements to describe the control flow. Given a WebRequest, this code calls GetResponse() to download the page. To gain stream access to the page, it callsGetResponseStream() and assigns the result to a StreamReader. Finally, it reads to the end of the stream with ReadToEnd() to determine the size of the page and then print it out to the screen.

The problem with this approach is, of course, that the calling thread is blocked until the I/O operation completes; this is wasting a thread that could be doing useful work while the asynchronous operation executes. For this reason, we cannot, for example, execute any other code, such as code that indicates progress.

Asynchronously Invoking a High-Latency Operation Using the TPL

To address this problem, Listing 18.14 takes a similar approach but instead uses task-based asynchrony with the TPL.

LISTING 18.14: An Asynchronous Web Request


using System;
using System.IO;
using System.Net;
using System.Linq;
using System.Threading.Tasks;
using System.Runtime.ExceptionServices;

public class Program
{
public static void Main(string[] args)
{
string url = "http://www.IntelliTect.com";
if(args.Length > 0)
{
url = args[0];
}

Console.Write(url);

Task task = WriteWebRequestSizeAsync(url);

try
{
while(!task.Wait(100))
{
Console.Write(".");
}
}
catch(AggregateException exception)
{
exception = exception.Flatten();
try
{
exception.Handle(innerException =>
{
// Rethrowing rather than using
// if condition on the type.
ExceptionDispatchInfo.Capture(
exception.InnerException)
.Throw();
return true;
});
}
catch(WebException)
{
// ...
}
catch(IOException )
{
// ...
}
catch(NotSupportedException )
{
// ...
}
}
}


private static Task WriteWebRequestSizeAsync(
string url)
{
StreamReader reader = null;
WebRequest webRequest =
WebRequest.Create(url);

Task task =
webRequest.GetResponseAsync()
.ContinueWith( antecedent =>
{
WebResponse response =
antecedent.Result;

reader =
new StreamReader(
response.GetResponseStream());
return reader.ReadToEndAsync();
})
.Unwrap()
.ContinueWith(antecedent =>
{
if(reader != null) reader.Dispose();
string text = antecedent.Result;
Console.WriteLine(
FormatBytes(text.Length));
});

return task;
}

// ...
}


Unlike Listing 18.13, when Listing 18.14 executes, it prints periods to the console while the page is downloading. The result is that instead of simply printing four periods (“....”) to the console, Listing 18.14 is able to continuously print periods for as long as it takes to download the file, read it from the stream, and determine its size.

Unfortunately, this asynchrony comes at the cost of complexity. Interspersed throughout the code is TPL-related code that interrupts the flow. Rather than simply following the WebRequest.GetResponseAsync() call with steps to retrieve the StreamReader and callReadToEndAsync(), the asynchronous version of the code requires ContinueWith() statements. The first ContinueWith() statement identifies what to execute after the WebRequest.GetResponseAsync(). Notice that the return statement in the firstContinueWith() expression returns StreamReader.ReadToEndAsync(), which returns another Task.

Without the Unwrap() call, therefore, the antecedent in the second ContinueWith() statement is a Task<Task<string>>, which alone indicates the complexity. As a result, it is necessary to call Result twice—once on the antecedent directly and a second time on theTask<string>.Result property antecedent.Result returned, with the latter blocking subsequent execution until the ReadToEnd() operation completes. To avoid the Task<Task<TResult>> structure, we preface the call to ContinueWith() with a call to Unwrap(), thereby shedding the outer Task and appropriately handling any errors or cancellation requests.

The complexity doesn’t stop with Tasks and ContinueWith(), however: The exception handling adds an entirely new dimension to the complexity. As mentioned earlier, the TPL generally throws an AggregateException exception because of the possibility that an asynchronous operation could encounter multiple exceptions. However, because we are calling the Result property from within ContinueWith() blocks, it is possible that inside the worker thread we might also throw an AggregateException.

As you learned earlier in the chapter, there are multiple ways to handle these exceptions:

1. We can add continuation tasks to all *Async methods that return a task along with each ContinueWith() method call. However, doing so would prevent us from using the fluid API in which the ContinueWith() statements are chained together one after the other. Furthermore, this would force us to deeply embed error-handling logic into the control flow rather than simply relying on exception handling.

2. We can surround each delegate body with a try/catch block so that no exceptions go unhandled from the task. Unfortunately, this approach is less than ideal as well. First, some exceptions (like those triggered when calling antecedent.Result) will throw anAggregateException from which we will need to unwrap the InnerException(s) to handle them individually. Upon unwrapping them, we either rethrow them so as to catch a specific type or conditionally check for the type of the exception separately from any other catch blocks (even catch blocks for the same type). Second, each delegate body will require its own separate try/catch handler, even if some of the exception types between blocks are the same. Third, Main’s call to task.Wait() could still throw an exception becauseWebRequest.GetResponseAsync() could potentially throw an exception, and there is no way to surround it with a try/catch block. Therefore, there is no way to eliminate the try/catch block in Main that surrounds task.Wait().

3. We can ignore all exception handling from within WriteWebRequestSizeAsync() and instead rely solely on the try/catch block that surrounds Main’s task.Wait(). Given that we know the exception will be an AggregateException, we can have a catch for only that exception. Within the catch block, we can handle the exception by calling Aggregate-Exception.Handle() and throwing each exception using the Exception-Dispatch-Info object so as not to lose the original stack trace. These exceptions are then caught by the expected exception handles and addressed accordingly. Notice, however, that before handling the Aggregate-Exception’s InnerExceptions, we first call AggregateException.Flatten(). This step addresses the issue of an AggregateException wrapping inner exceptions that are also of type AggregateException (and so on). By calling Flatten(), we ensure that all exceptions are moved to the first level and all contained AggregateExceptions are removed.

As shown in Listing 18.14, option 3 is probably the preferred approach because it keeps the exception handling outside the control flow for the most part. This doesn’t eliminate the error-handling complexity entirely; rather, it simply minimizes the occasions on which it is interspersed within the regular control flow.

Although the asynchronous version in Listing 18.14 has almost the same logical control flow as the synchronous version in Listing 18.13, both versions attempt to download a resource from a server and, if the download succeeds, the result is returned. (If the download fails, the exception’s type is interrogated to determine the right course of action.) However, it is clear that the asynchronous version of Listing 18.14 is significantly more difficult to read, understand, and change than the corresponding synchronous version in Listing 18.13. Unlike the synchronous version, which uses standard control flow statements, the asynchronous version is forced to create multiple lambda expressions to express the continuation logic in the form of delegates.

And this is a fairly simple example! Imagine what the asynchronous code would look like if, for example, the synchronous code contained a loop that retried the operation three times if it failed, if it tried to contact multiple different servers, if it took a collection of resources rather than a single one, or if all of these possible features occurred together. Adding those features to the synchronous version would be straightforward, but it is not at all clear how to do so in the asynchronous version. Rewriting synchronous methods into asynchronous methods by explicitly specifying the continuation of each task gets very complicated very quickly even if the synchronous continuations are what appear to be very simple control flows.

The Task-Based Asynchronous Pattern with async and await

Fortunately, it turns out that it is actually not too difficult to write a computer program that does these complex code transformations for you. The designers of the C# language realized this need would crop up, and they have added such a capability to the C# 5.0 compiler. Starting with C# 5.0, you can rewrite the synchronous program given earlier into an asynchronous program much more easily using the Task-based Asynchronous Pattern (TAP); the C# compiler then does the tedious work of transforming your method into a series of task continuations. Listing 18.15 shows how to rewrite Listing 18.13 into an asynchronous method without the major structural changes of Listing 18.14.

LISTING 18.15: An Asynchronous Web Request Using the Task-Based Asynchronous Pattern


using System;
using System.IO;
using System.Net;
using System.Linq;
using System.Threading.Tasks;

public class Program
{
private static async Task WriteWebRequestSizeAsync(
string url)
{
try
{
WebRequest webRequest =
WebRequest.Create(url);
WebResponse response =
await webRequest.GetResponseAsync();
using(StreamReader reader =
new StreamReader(
response.GetResponseStream()))
{
string text =
await reader.ReadToEndAsync();
Console.WriteLine(
FormatBytes(text.Length));
}
}
catch(WebException)
{
// ...
}
catch(IOException )
{
// ...
}
catch(NotSupportedException )
{
// ...
}
}

public static void Main(string[] args)
{
string url = "http://www.IntelliTect.com";
if(args.Length > 0)
{
url = args[0];
}

Console.Write(url);

Task task = WriteWebRequestSizeAsync(url);

while(!task.Wait(100))
{
Console.Write(".");
}
}

// ...

}


Notice the small differences between Listing 18.13 and Listing 18.15. First, we refactor the body of the web request functionality into a new method (WriteWebRequestSizeAsync()) and add the new contextual keyword async to the method’s declaration. A method decorated with this keyword must return Task, Task<T>, or void. In this case, since there is no data returned by the body of the method but we still want the capability of returning information about the asynchronous activity to the caller, WriteWebRequestSizeAsync() returns Task. Notice the method name suffix is Async; this is not necessary, but it is conventional to mark asynchronous methods this way so as to identify their asynchronous behavior. Finally, everywhere there is an asynchronous equivalent for the synchronous method, we insert the new contextual keywordawait before invoking the asynchronous version.

Notice that nothing else changes between Listings 18.13 and 18.15. The asynchronous method versions seemingly still return the same data types as before—despite the fact that each actually returns a Task<T>. This is not via some magical implicit cast, either. GetResponseAsync()is declared as follows:

public virtual Task<WebResponse> GetResponseAsync() { ... }

At the call site, we assign the return value to WebResponse:

WebResponse response = await webRequest.GetResponseAsync()

The async contextual keyword plays a critical role by signaling to the compiler that it should rewrite the expression into a state machine that represents all the control flow we saw in Listing 18.14 (and more).

Also notice the try/catch logic improvements over Listing 18.14 that appear in Listing 18.15. In Listing 18.15, there is no need to catch an AggregateException. The catch clause continues to catch the exact type of exception expected, with no unwrapping of the inner exceptions required. Rather, the compiler’s rewrite seemingly ensures that the AggregateException in the task is processed just as if it was a normal, synchronously thrown exception. In reality, the AggregateException (and its internal exception collection) continue to operate as expected only when you await the task, at which point the rewrite pulls the first exception from the collection and throws it. The aim is to make the asynchronous code look as much as possible like the synchronous code.

To better understand the control flow, Table 18.2 shows each task in a separate column along with the execution that occurs on each task.

Image

TABLE 18.2: Control Flow within Each Task

There are a couple of important misconceptions that the table helps to dismiss:

Misconception #1: A method decorated with the async keyword is automatically executed on a worker thread when called. This is absolutely not true; the method is executed normally, on the calling thread, and if the implementation doesn’t await any incomplete awaitable tasks, it will complete synchronously on the same thread. It’s the method’s implementation that is responsible for starting any asynchronous work. Just using the async keyword does not change where the method’s code executes. Also, there is nothing unusual about a call to anasync method from the caller’s perspective; it is a method typed as returning a Task, it is called normally, and it returns an object of its return type normally.

Misconception #2: The await keyword causes the current thread to block until the awaited task is completed. That is also absolutely not true. If you want the current thread to block until the task completes, call the Wait() method, as we have already described. In fact, theMain thread does so repeatedly while waiting for the other tasks to complete. However, the while(!task.Wait(100)) { } call executes concurrently with the other tasks—not synchronously. The await keyword evaluates the expression that follows it, which is usually of type Task or Task<T>, adds a continuation to the resultant task, and then immediately returns control to the caller. The creation of the task has started asynchronous work; the await keyword means that the developer wishes the caller of this method to continue executing its work on this thread while the asynchronous work is processed. At some point after that asynchronous work is complete, execution will resume at the point of control following the await expression.

In fact, the principal reasons why the async keyword exists in the first place are twofold. First, it makes it crystal clear to the reader of the code that the method that follows will be automatically rewritten by the compiler. Second, it informs the compiler that usages of the awaitcontextual keyword in the method are to be treated as asynchronous control flow, and not as an ordinary identifier.

Asynchronous Lambdas

Just as a lambda expression converted to a delegate can be used as a concise syntax for declaring a normal method, so C# 5.0 (and later) also allows lambdas containing await expressions to be converted to delegates. To do so, just precede the lambda expression with the async keyword. InListing 18.16, we rewrite the GetResourceAsync() method from Listing 18.15 from an async method to an async lambda.

LISTING 18.16: An Asynchronous Client-Server Interaction As a Lambda Expression


using System;
using System.IO;
using System.Net;
using System.Linq;
using System.Threading.Tasks;

public class Program
{

public static void Main(string[] args)
{
string url = "http://www.IntelliTect.com";
if(args.Length > 0)
{
url = args[0];
}

Console.Write(url);

Func<string, Task> writeWebRequestSizeAsync =
async (string webRequestUrl) =>
{
// Error handling ommitted for
// elucidation.
WebRequest webRequest =
WebRequest.Create(url);

WebResponse response =
await webRequest.GetResponseAsync();
using(StreamReader reader =
new StreamReader(
response.GetResponseStream()))
{
string text =
(await reader.ReadToEndAsync());
Console.WriteLine(
FormatBytes(text.Length));
}
};

Task task = writeWebRequestSizeAsync(url);

while (!task.Wait(100))
{
Console.Write(".");
}
}

// ...

}


Note that an async lambda expression has the exact same restrictions as the named async method:

• An async lambda expression must be converted to a delegate whose return type is void, Task, or Task<T>.

• The lambda is rewritten so that return statements become signals that the task returned by the lambda has completed with the given result.

• Execution within the lambda expression occurs synchronously until the first await on an incomplete awaitable is executed.

• All instructions following the await will execute as continuations on the return from the invoked asynchronous method (or, if the awaitable is already complete, will be simply executed synchronously rather than as continuations).

• An async lambda expression can be invoked with an await (not shown in Listing 18.16).


Advanced Topic: Implementing a Custom Asynchronous Method

Implementing an asynchronous method by relying on other asynchronous methods (which, in turn, rely on more asynchronous methods) is relatively easy with the await keyword. However, at some point in the call hierarchy it becomes necessary to write a “leaf” asynchronousTask-returning method. Consider, for example, an asynchronous method for running a command-line program with the eventual goal that the output could be accessed. Such a method would be declared as follows:

static public Task<Process> RunProcessAsync(string filename)

The simplest implementation would, of course, be to rely on Task.Run() again and call both the System.Diagnostics.Process’s Start() and WaitForExit() methods. However, creating an additional thread in the current process is unnecessary when the invoked process itself will have its own collection of one or more threads. To implement the RunProcessAsync() method and return to the caller’s synchronization context when the invoked process completes, we can rely on a TaskCompletionSource<T> object, as shown inListing 18.17.

LISTING 18.17: Implementing a Custom Asynchronous Method


using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static public Task<Process> RunProcessAsync(
string fileName,
string arguments = null,
CancellationToken cancellationToken =
default(CancellationToken))
{
TaskCompletionSource<Process> taskCS =
new TaskCompletionSource<Process>();

Process process = new Process()
{
StartInfo = new ProcessStartInfo(fileName)
{
UseShellExecute = false,
Arguments = arguments
},
EnableRaisingEvents = true
};

process.Exited += (sender, localEventArgs) =>
{
taskCS.SetResult(process);
};

cancellationToken
.ThrowIfCancellationRequested();

process.Start();

cancellationToken.Register(() =>
{
process.CloseMainWindow();
});

return taskCS.Task;
}

// ...
}


Ignore the highlighting for the moment and instead focus on the pattern of using an event for notification when the process completes. Since System.Diagnostics.Process includes a notification upon exit, we register for this notification and use it as a callback from which we can invoke TaskCompletionSource.SetResult(). The code in Listing 18.17 follows a fairly common pattern that you can use to create an asynchronous method without having to resort to Task.Run().

Another important characteristic that an async method might require is cancellation. TAP relies on the same methods for cancellation as the TPL does—namely, a System.Threading.CancellationToken. Listing 18.17 highlights the code necessary to support cancellation. In this example, we allow for canceling before the process ever starts, as well as an attempt to close the application’s main window (if there is one). A more aggressive approach would be to call Process.Kill(), but this could potentially cause problems for the program that is executing.

Notice that we don’t register for the cancellation event until after the process is started. This avoids any race conditions that might occur if cancellation is triggered before the process actually begins.

One last feature to consider supporting is a progress update. Listing 18.18 is the full version of RunProcessAsync() with just such an update.

LISTING 18.18: Implementing a Custom Asynchronous Method with Progress Support


using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static public Task<Process> RunProcessAsync(
string fileName,
string arguments = null,
CancellationToken cancellationToken =
default(CancellationToken),
IProgress<ProcessProgressEventArgs> progress =
null,
object objectState = null)
{
TaskCompletionSource<Process> taskCS =
new TaskCompletionSource<Process>();

Process process = new Process()
{
StartInfo = new ProcessStartInfo(fileName)
{
UseShellExecute = false,
Arguments = arguments,
RedirectStandardOutput =
progress != null
},
EnableRaisingEvents = true
};

process.Exited += (sender, localEventArgs) =>
{
taskCS.SetResult(process);
};

if(progress != null)
{
process.OutputDataReceived +=
(sender, localEventArgs) =>
{
progress.Report(
new ProcessProgressEventArgs(
localEventArgs.Data,
objectState));
};
}

if(cancellationToken.IsCancellationRequested)
{
cancellationToken
.ThrowIfCancellationRequested();
}

process.Start();

if(progress != null)
{
process.BeginOutputReadLine();
}

cancellationToken.Register(() =>
{
process.CloseMainWindow();
cancellationToken
.ThrowIfCancellationRequested();
});

return taskCS.Task;
}
// ...
}


class ProcessProgressEventArgs
{
// ...
}



Advanced Topic: Awaiting Non-Task<T> Values

Generally, the expression that follows the await keyword is of either type Task or type Task<T>. In the examples of await shown so far in this chapter, the expressions that follow the keyword have all returned Task<T>. From a syntax perspective, an await operating on typeTask is essentially the equivalent of an expression that returns void. In fact, because the compiler does not even know whether the task has a result, much less which type it is, such an expression is classified in the same way as a call to a void-returning method; that is, you can use it only in a statement context. Listing 18.19 shows some await expressions used as statement expressions.

LISTING 18.19: An await Expression May Be a Statement Expression


async Task<int> DoStuffAsync()
{
await DoSomethingAsync();
await DoSomethingElseAsync();
return await GetAnIntegerAsync() + 1;
}


Here we presume that the first methods return a Task, rather than a Task<T>. Since there is no result value associated with the first two tasks, awaiting them produces no value; thus the expression must appear as a statement. The third task is presumably of type Task<int>, and its value can be used in the computation of the value of the task returned by DoStuffAsync().

This Advanced Topic begins with the word Generally—a deliberate injection of incertitude. In fact, the exact rule regarding the return type that await requires is more generic than just Task or Task<T>. Rather, it requires that the type support a GetAwaiter. This method produces an object that has certain properties and methods needed by the compiler’s rewriting logic. This makes the system extensible by third parties.8 If you want to design your own non-Task-based asynchrony system that uses some other type to represent asynchronous work, however, you can do so and still use the await syntax.

8. This technique of allowing third-party extension by looking for a particular method by its signature is used in two other C# features: LINQ looks for methods like Select() and Where() by name to implement the select and where contextual keywords, and the foreach loop does not require that the collection implement IEnumerable, just that it have an appropriateGetEnumerator() method.

Note that it is not possible to make async methods return something other than void, Task, or Task<T>, no matter which type is awaited inside the method.


Wrapping your head around precisely what is happening in an async method can be difficult, but it is far less difficult than trying to figure out what asynchronous code written with explicit continuations in lambdas is doing. The key points to remember are as follows:

• When control reaches an await keyword, the expression that follows it produces a task.9 Control then returns to the caller so that it can continue to do work while the task completes asynchronously.

9. Technically, it is an awaitable type as described in the Advanced Topic titled “Awaiting Non-Task<T> Values.”

• Some time after the task completes, control resumes at the point following the await. If the awaited task produces a result, that result is then obtained. If it faulted, the exception is thrown.

• A return statement in an async method causes the task associated with the method invocation to become completed; if the return statement has a value, the value returned becomes the result of the task.

Task Schedulers and the Synchronization Context

On occasion, this chapter has mentioned the task scheduler and its role in determining how to assign work to threads efficiently. Programmatically, the task scheduler is an instance of the System.Threading.Tasks.TaskScheduler. This class, by default, uses the thread pool to schedule tasks appropriately, determining how to safely and efficiently execute them—when to reuse them, dispose them, or create additional ones.

It is possible to create your own task scheduler that makes different choices about how to schedule tasks by deriving a new type from the TaskScheduler class. You can obtain a TaskScheduler that will schedule a task to the current thread (or, more precisely, to thesynchronization context associated with the current thread), rather than to a different worker thread, by using the static FromCurrentSynchronizationContext() method.10

10. For an example, see Listing C.8 in Multithreading Patterns Prior to C# 5.0, available at IntelliTect.com/EssentialCSharp.

The synchronization context under which a task executes and, in turn, the continuation task(s) execute(s), is important because the awaiting task consults the synchronization context (assuming there is one) so that a task can execute efficiently and safely. Listing 18.20 (along with Output 18.7) is similar to Listing 18.5 except that it also prints out the thread ID when it displays the message.

LISTING 18.20: Calling Task.ContinueWith()


using System;
using System.Threading;
using System.Threading.Tasks;

public class Program
{
public static void Main()
{
DisplayStatus("Before");
Task taskA =
Task.Run(() =>
DisplayStatus("Starting..."))
.ContinueWith( antecedent =>
DisplayStatus("Continuing A..."));
Task taskB = taskA.ContinueWith( antecedent =>
DisplayStatus("Continuing B..."));
Task taskC = taskA.ContinueWith( antecedent =>
DisplayStatus("Continuing C..."));

Task.WaitAll(taskB, taskC);
DisplayStatus("Finished!");
}

private static void DisplayStatus(string message)
{
string text = string.Format(
$@"{ Thread.CurrentThread.ManagedThreadId
}: { message }");
Console.WriteLine(text);
}
}


OUTPUT 18.7

1: Before
3: Starting...
4: Continuing A...
3: Continuing C...
4: Continuing B...
1: Finished!

What is noteworthy about this output is that the thread ID changes sometimes and gets repeated at other times. In this kind of plain console application, the synchronization context (accessible from SynchronizationContext.Current) is null—the default synchronization context causes the thread pool to handle thread allocation instead. This explains why the thread ID changes between tasks: Sometimes the thread pool determines that it is more efficient to use a new thread, and sometimes it decides that the best course of action is to reuse an existing thread.

Fortunately, the synchronization context gets set automatically for types of applications where that is critical. For example, if the code creating tasks is running in a thread created by ASP.NET, the thread will have a synchronization context of type AspNetSynchronizationContextassociated with it. In contrast, if your code is running in a thread created in a Windows UI application (WPF or Windows Forms), the thread will have an instance of DispatcherSynchronizationContext associated with it. (For console applications, there is no synchronization context by default.) Since the TPL consults the synchronization context and the synchronization context varies depending on the circumstances of the execution, the TPL is able to schedule continuations executing in contexts that are both efficient and safe.

To modify the code so that the synchronization context is leveraged instead, you must (1) set the synchronization context and (2) use async/await so that the synchronization context is consulted.11

11. For a simple example of how to set the synchronization context of a thread, and how to use a task scheduler to schedule a task to that thread, see Listing C.8 in Multithreading Patterns Prior to C# 5.0, available at IntelliTect.com/EssentialCSharp.

It is possible to define custom synchronization contexts, and to work with existing synchronization contexts to improve their performance in some specific scenarios. However, describing how to do so is beyond the scope of this text.

async/await with the Windows UI

One place where synchronization is especially important is in the context of UI and Web programming. With the Windows UI, for example, a message pump processes messages such as mouse click and move events. Furthermore, the UI is single-threaded, so that interaction with any UI components (a text box, for example) must always occur from the single UI thread. One of the key advantages of the async/await pattern is that it leverages the synchronization context to ensure that continuation work—work that appears after the await statement—will always execute on the same synchronization task that invoked the await statement. This approach is of significant value because it eliminates the need to explicitly switch back to the UI thread to update a control.

To better appreciate this benefit, consider the example of a UI event for a button click in WPF, as shown in Listing 18.21.

LISTING 18.21: Synchronous High-Latency Invocation in WPF


using System;

private void PingButton_Click(
object sender, RoutedEventArgs e)
{
StatusLabel.Content = "Pinging...";
UpdateLayout();
Ping ping = new Ping();
PingReply pingReply =
ping.Send("www.IntelliTect.com");
StatusLabel.Text = pingReply.Status.ToString();
}


Given that StatusLabel is a WPF System.Windows.Controls.TextBlock control and we have updated the Content property twice within the PingButton_Click() event subscriber, it would be a reasonable assumption that first “Pinging...” would be displayed untilPing.Send() returned, and then the label would be updated with the status of the Send() reply. As those experienced with Windows UI frameworks well know, this is not, in fact, what happens. Rather, a message is posted to the Windows message pump to update the content with “Pinging...” but, because the UI thread is busy executing the PingButton_Click() method, the Windows message pump is not processed. By the time the UI thread frees up to look at the Windows message pump, a second Text property update request has been queued and the only message that the user is able to observe is the final status.

To fix this problem using TAP, we change the code highlighted in Listing 18.22.

LISTING 18.22: Synchronous High-Latency Invocation in WPF Using await


using System;
async private void PingButton_Click(
object sender, RoutedEventArgs e)
{
StatusLabel.Content = "Pinging...";
UpdateLayout();
Ping ping = new Ping();
PingReply pingReply =
await ping.SendPingAsync("www.IntelliTect.com");
StatusLabel.Text = pingReply.Status.ToString();
}


This change offers two advantages. First, the asynchronous nature of the ping call frees up the caller thread to return to the Windows message pump caller’s synchronization context, and processes the update to StatusLabel.Content so that “Pinging...” appears to the user. Second, when awaiting ping.SendTaskAsync() completes, it will always execute on the same synchronization context as the caller. Also, because the synchronization context is specifically appropriate for Windows UI, it is single-threaded and, therefore, the return will always be to the same thread—the UI thread. In other words, rather than immediately executing the continuation task, the TPL consults the synchronization context, which instead posts a message regarding the continuation work to the message pump. Next, because the UI thread monitors the message pump, upon picking up the continuation work message, it invokes the code following the await call. (As a result, the invocation of the continuation code is on the same thread as the caller that processed the message pump.)

There is a key code readability feature built into the TAP language pattern. Notice in Listing 18.22 that the call to return pingReply.Status appears to flow naturally after the await, providing a clear indication that it will execute immediately following the previous line. However, writing what really happens from scratch would be far less understandable for multiple reasons.

await Operators

There is no limitation on the number of times that await can be placed into a single method. In fact, such statements are not limited to appearing one after another. Rather, await statements can be placed into loops and processed consecutively one after the other, thereby following a natural control flow the way code appears. Consider the example in Listing 18.23.

LISTING 18.23: Iterating over an Await Operation


async private void PingButton_Click(
object sender, RoutedEventArgs e)
{
List<string> urls = new List<string>()
{
"www.habitat-spokane.org",
"www.partnersintl.org",
"www.iassist.org",
"www.fh.org",
"www.worldvision.org"
};
IPStatus status;

Func<string, Task<IPStatus>> func =
async (localUrl) =>
{
Ping ping = new Ping();
PingReply pingReply =
await ping.SendPingAsync(localUrl);
return pingReply.Status;
};

StatusLabel.Content = "Pinging...";

foreach(string url in urls)
{
status = await func(url);
StatusLabel.Text =
$@"{ url }: { status.ToString() } ({
Thread.CurrentThread.ManagedThreadId })";
}
}


Regardless of whether the await statements occur within an iteration or as separate entries, they will execute serially, one after the other and in the same order they were invoked from the calling thread. The underlying implementation is to string them together in the semantic equivalent of Task.ContinueWith(), except that all of the code between the await operators will execute in the caller’s synchronization context.

Support for TAP from the UI is one of the key scenarios that led to TAP’s creation. A second scenario takes place on the server, when a request comes in from a client to query an entire table’s worth of data from the database. As querying the data could be time-consuming, a new thread should be created rather than consuming one from the limited number allocated to the thread pool. The problem with this approach is that the work to query from the database is executing entirely on another machine. There is no reason to block an entire thread given that the thread is generally not active anyway.

To summarize, TAP was created to address these key problems:

• There is a need to allow long-running activities to occur without blocking the UI thread.

• Creating a new thread (or Task) for non–CPU-intensive work is relatively expensive when you consider that all the thread is doing is waiting for the activity to complete.

• When the activity completes (either by using a new thread or via a callback), it is frequently necessary to make a thread synchronization context switch back to the original caller that initiated the activity.

• TAP provides a new pattern that works for both CPU-intensive and non–CPU-intensive asynchronous invocations—one that all .NET languages support explicitly.

Executing Loop Iterations in Parallel

Consider the following for loop statement and associated code (see Listing 18.24 and the corresponding output, Output 18.8). The Listing calls a method for calculating a section of the decimal expansion of pi, where the parameters are the number of digits and the digit to start with. The actual calculation is not germane to the discussion. What is interesting about this calculation is that it is embarrassingly parallelizable; that is, it is almost embarrassing how easy it is to split up a large task—say, computing 1 million decimal digits of pi—into any desired number of smaller tasks that can all be run in parallel. These types of computations are the easiest ones to speed up by adding parallelism.

LISTING 18.24: For Loop Synchronously Calculating Pi in Sections


using System;
using AddisonWesley.Michaelis.EssentialCSharp.Shared;

class Program
{
const int TotalDigits = 100;
const int BatchSize = 10;

static void Main()
{
string pi = null;
const int iterations = TotalDigits / BatchSize;
for(int i = 0; i < iterations; i++)
{
pi += PiCalculator.Calculate(
BatchSize, i * BatchSize);
}

Console.WriteLine(pi);
}
}


using System;

class PiCalculator
{
public static string Calculate(
int digits, int startingAt)
{
// ...
}

// ...
}


OUTPUT 18.8

>3.14159265358979323846264338327950288419716939937510582097494459230781
64062862089986280348253421170679821480865132823066470938446095505822317
25359408128481117450284102701938521105559644622948954930381964428810975
66593344612847564823378678316527120190914564856692346034861045432664821
33936072602491412737245870066063155881748815209209628292540917153643678
92590360011330530548820466521384146951941511609433057270365759591953092
18611738193261179310511854807446237996274956735188575272489122793818301
194912

The for loop executes each iteration synchronously and sequentially. However, because the pi calculation algorithm splits the pi calculation into independent pieces, it is not necessary to compute the pieces sequentially just as long as the results are appended in the right order. Imagine what would happen if you could have all the iterations of this loop run concurrently: Each processor could take a single iteration and execute it in parallel with other processors executing other iterations. Given the simultaneous execution of iterations, we could decrease the execution time more and more based on the number of processors.

The TPL provides a convenient method, Parallel.For(), that does precisely that. Listing 18.25 shows how to modify the sequential, single-threaded program in Listing 18.24 to use the helper method.

LISTING 18.25: For Loop Calculating Pi in Sections in Parallel


using System;
using System.Threading.Tasks;
using AddisonWesley.Michaelis.EssentialCSharp.Shared;

// ...

class Program
{
static void Main()
{
string pi = null;
const int iterations = TotalDigits / BatchSize;
string[] sections = new string[iterations];
Parallel.For(0, iterations, (i) =>
{
sections[i] = PiCalculator.Calculate(
BatchSize, i * BatchSize);
});
pi = string.Join("", sections);
Console.WriteLine(pi);
}


The output for Listing 18.25 is identical to Output 18.8; however, the execution time is significantly faster if you have multiple CPUs (and possibly slower if you do not). The Parallel.For() API is designed to look similar to a standard for loop. The first parameter is thefromInclusive value, the second is the toExclusive value, and the last is the Action<int> to perform as the loop body. When using an expression lambda for the action, the code looks similar to a for loop statement except that now each iteration may execute in parallel. As with the for loop, the call to Parallel.For() will not complete until all iterations are complete. In other words, by the time execution reaches the string.Join() statement, all sections of pi will have been calculated.

Note that the code for combining the various sections of pi no longer occurs inside the iteration (action) in Listing 18.25. As sections of the pi calculation will very likely not complete sequentially, appending a section whenever an iteration completes will likely append them out of order. Even if sequence was not a problem, there is still a potential race condition because the += operator is not atomic. To address both of these problems, each section of pi is stored into an array and no two or more iterations will access a single element within the array simultaneously. Only once all sections of pi are calculated does string.Join() combine them. In other words, we postpone concatenating the sections until after the Parallel.For() loop has completed. This avoids any race condition caused by sections not yet calculated or sections concatenating out of order.

The TPL uses the same sorts of thread pooling techniques that it uses for task scheduling to ensure good performance of the parallel loop: It will try to ensure that CPUs are not overscheduled, and so on.


Guidelines

DO use parallel loops when the computations performed can be easily split up into many mutually independent processor-bound computations that can be executed in any order on any thread.


The TPL also provides a similar parallel version of the foreach statement, as shown in Listing 18.26.

LISTING 18.26: Parallel Execution of a foreach Loop


using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;

class Program
{
// ...
static void EncryptFiles(
string directoryPath, string searchPattern)
{
IEnumerable<string> files = Directory.EnumerateFiles(
directoryPath, searchPattern,
SearchOption.AllDirectories);

Parallel.ForEach(files, (fileName) =>
{
Encrypt(fileName);
});
}
// ...
}


In this example, we call a method that encrypts each file within the files collection. It does so in parallel, executing as many threads as the TPL determines is efficient.


Advanced Topic: How the TPL Tunes Its Own Performance

The default scheduler within the TPL targets the thread pool, resulting in a variety of heuristics to try to ensure that the right number of threads are executing at any one time. Two of the heuristics it uses are hill climbing and work stealing.

The hill climbing algorithm involves creating threads to run tasks, and then monitoring the performance of those tasks to try to experimentally determine the point at which adding more threads begins making performance worse. Once that point is reached, the number of threads can then be decreased back to the number that produced the best performance.

The TPL associates “top-level” tasks that are waiting to be executed with no particular thread. If, however, a task running on a thread itself creates another task, the newly created task is associated with that thread automatically. When the new “child” task is eventually scheduled to run, it usually runs on the same thread as the task that created it. The work stealing algorithm identifies threads that have an unusually large or unusually small amount of pending work; a thread that has too few tasks associated with it will sometimes “steal” not-yet-executed tasks from threads that have too many tasks waiting to run.

The key feature of these algorithms is that they enable the TPL to dynamically tune its own performance to mitigate processor overscheduling and underscheduling, and to balance the work among the available processors.

The TPL generally does a good job of tuning its own performance, but you can help it do a better job by providing hints about the best course of action. Specifying the TPL TaskCreationOptions.LongRunning option described earlier in the section “Long-Running Tasks” is an example of such a hint. You can also explicitly tell the task scheduler how many threads you think would be best to service a parallel loop; see the Advanced Topic titled “Parallel Loop Options” for more details.



Beginner Topic: Parallel Loop Exception Handling with AggregateException

We know already that the TPL catches and saves exceptions associated with tasks in an AggregateException, because a given task might have several exceptions obtained from its subtasks. This is also the case with parallel execution of loops: Each iteration could have produced an exception, so the exceptions need to be gathered up into one aggregating exception. Consider the example in Listing 18.27 and its output in Output 18.9.

LISTING 18.27: Unhandled Exception Handling for Parallel Iterations


using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

class Program
{
// ...
static void EncryptFiles(
string directoryPath, string searchPattern)
{
IEnumerable<string> files = Directory.EnumerateFiles(
directoryPath, searchPattern,
SearchOption.AllDirectories);
try
{
Parallel.ForEach(files, (fileName) =>
{
Encrypt(fileName);
});
}
catch(AggregateException exception)
{
Console.WriteLine(
"ERROR: {0}:",
exception.GetType().Name);
foreach(Exception item in
exception.InnerExceptions)
{
Console.WriteLine(" {0} - {1}",
item.GetType().Name, item.Message);
}
}
}
// ...
}


OUTPUT 18.9

ERROR: AggregateException:
UnauthorizedAccessException - Attempted to perform an unauthorized
operation.
UnauthorizedAccessException - Attempted to perform an unauthorized
operation.
UnauthorizedAccessException - Attempted to perform an unauthorized
operation.

Output 18.9 shows that three exceptions occurred while executing the Parallel.ForEach<T>(...) loop. However, in the code, there is only one catch of type System.AggregateException. The UnauthorizedAccessExceptions were retrieved from theInnerExceptions property on the AggregateException. With a Parallel.ForEach<T>() loop, each iteration could potentially throw an exception, so the System.AggregateException thrown by the method call will contain each of those exceptions within itsInnerExceptions property.


Canceling a Parallel Loop

Unlike a task, which requires an explicit call if it is to block until it completes, a parallel loop executes iterations in parallel but does not itself return until the entire parallel loop completes. Canceling a parallel loop, therefore, generally involves invocation of the cancellation request from a thread other than the one executing the parallel loop. In Listing 18.28, we invoke Parallel.ForEach<T>() using Task.Run(). In this manner, not only does the query execute in parallel, but it also executes asynchronously, allowing the code to prompt the user to “Push ENTER to exit.”

LISTING 18.28: Canceling a Parallel Loop


using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

public class Program
{
// ...

static void EncryptFiles(
string directoryPath, string searchPattern)
{

string stars =
"*".PadRight(Console.WindowWidth-1, '*');

IEnumerable<string> files = Directory.GetFiles(
directoryPath, searchPattern,
SearchOption.AllDirectories);

CancellationTokenSource cts =
new CancellationTokenSource();
ParallelOptions parallelOptions =
new ParallelOptions
{ CancellationToken = cts.Token };
cts.Token.Register(
() => Console.WriteLine("Cancelling..."));

Console.WriteLine("Push ENTER to exit.");

// Use Task.Factory.StartNew<string>() for
// TPL prior to .NET 4.5
Task task = Task.Run(() =>
{
try
{
Parallel.ForEach(
files, parallelOptions,
(fileName, loopState) =>
{
Encrypt(fileName);
});
}
catch(OperationCanceledException){}
});

// Wait for the user's input
Console.Read();

// Cancel the query
cts.Cancel();
Console.Write(stars);
task.Wait();
}
}


The parallel loops use the same cancellation token pattern that tasks use. The token obtained from a CancellationTokenSource is associated with the parallel loop by calling an overload of the ForEach() method that has a parameter of type ParallelOptions. This object contains the cancellation token.

Note that if you cancel a parallel loop operation, any iterations that have not started yet are prevented from starting by checking the IsCancellationRequested property. Existing executing iterations will run to their respective termination points. Furthermore, calling Cancel()even after all iterations have completed will still cause the registered cancel event (via cts.Token.Register()) to execute.

The only means by which the ForEach() method is able to acknowledge that the loop has been canceled is via the OperationCanceledException. Given that cancellation in this example is expected, the exception is caught and ignored, allowing the application to display “Canceling...”, followed by a line of stars before exiting.


Advanced Topic: Parallel Loop Options

Although not generally necessary, it is possible to control the maximum degree of parallelism (that is, the number of threads that are scheduled to run at the same time) via the ParallelOptions parameter on overloads of both the Parallel.For() andParallel.ForEach<T>() loops. In some specific cases, the developer may know more about the particular algorithm or circumstance such that changing the maximum degree of parallelism makes sense. These circumstances include the following:

• Scenarios where you want to disable parallelism to make debugging or analysis easier. Setting the maximum degree of parallelism to 1 ensures that the loop iterations do not run concurrently.

• Scenarios where you know ahead of time that the degree of parallelism will be gated on an external factor such as a hardware constraint. For example, if your parallel operation involves using multiple USB ports, it is possible that there is no point in creating more threads than there are available ports.

• Scenarios with really long-running loop iterations (for example, minutes or hours). The thread pool can’t distinguish long-running iterations from blocked operations, so it could end up introducing many new threads, all of which will be consumed by the for loop. This can result in incremental thread growth over time, resulting in a huge number of threads in the process.

And so on. To control the maximum degree of parallelism, use the MaxDegreeOfParallelism property on the ParallelOptions object.

You can also use the ParallelOptions object’s TaskScheduler property to specify a custom task scheduler to use to schedule the tasks associated with each iteration. For example, you might have an asynchronous event handler that responds to the user’s click of a “Next” button. If the user clicks the button several times, you might want to use a custom task scheduler that prioritizes the most recently created task, rather than prioritizing the task that has waited the longest. The task scheduler provides a means of specifying how the tasks will execute in relation to one another.

The ParallelOptions object also has a CancellationToken property that provides a mechanism to communicate to the loop that no further iterations should start. Additionally, the body of an iteration can watch the cancellation token to determine if an early exit from the iteration is in order.



Advanced Topic: Breaking a Parallel Loop

Like a standard for loop, the Parallel.For() loop supports the concept of “breaking” to exit the loop and canceling any further iterations. In the context of parallel for execution, however, a break signifies that no new iterations following the breaking iteration should start. All currently executing iterations, however, will run to completion.

To break a parallel loop, you can provide a cancellation token and cancel it on another thread, as described in the preceding Advanced Topic. You can also use an overload of the Parallel.For() method whose body delegate takes two parameters: the index, and aParallelLoopState object. An iteration that wishes to “break” the loop can call the Break() or Stop() method on the loop state object passed to the delegate. The Break() method indicates that no more iterations with index values higher than the current value need to execute; the Stop() method indicates that no more iterations need to run at all.

For example, suppose you have a Parallel.For() loop that is performing ten iterations in parallel. Some of those iterations might run faster than others, and the task scheduler does not guarantee that they will run in any particular order. Suppose the first iteration has completed; iterations 3, 5, 7, and 9 are “in flight,” scheduled to four different threads; and iterations 5 and 7 both call Break(). In this scenario, iterations 6 and 8 will never start, but iterations 2 and 4 will still be scheduled to run. Iterations 3 and 9 will run to completion because they were already started when the break happened.

The Parallel.For() and Parallel.ForEach<T>() methods return a reference to a ParallelLoopResult object that contains useful information about what happened during the loop. This result object has the following properties:

• IsCompleted returns a Boolean indicating whether all iterations started.

• LowestBreakIteration identifies the lowest iteration that executed a break. The value is of type long?, where a value of null indicates no break statement was encountered.

Returning to the ten-iteration example, the IsCompleted property will return false and the LowestBreakIteration will return a value of 5.


Running LINQ Queries in Parallel

Just as it is possible to execute a loop in parallel using Parallel.For(), so it is also possible to execute LINQ queries in parallel using the Parallel LINQ API (PLINQ, for short). An example of a simple nonparallel LINQ expression is shown in Listing 18.29; in Listing 18.30, we modify it to run in parallel.

LISTING 18.29: LINQ Select()


using System.Collections.Generic;
using System.Linq;

class Cryptographer
{
// ...
public List<string>
Encrypt(IEnumerable<string> data)
{
return data.Select(
item => Encrypt(item)).ToList();
}
// ...
}


In Listing 18.29, a LINQ query uses the Select() standard query operator to encrypt each string within a sequence of strings, and convert the resultant sequence to a list. This seems like an “embarrassingly parallel” operation; each encryption is likely to be a high-latency processor-bound operation that could be farmed out to a worker thread on another CPU.

Listing 18.30 shows how to modify Listing 18.29 so that the code that encrypts the strings is executed in parallel.

LISTING 18.30: Parallel LINQ Select()


using System.Linq;

class Cryptographer
{
// ...
public List<string> Encrypt (IEnumerable<string> data)
{
return data.AsParallel().Select(
item => Encrypt(item)).ToList();
}
// ...
}


As Listing 18.30 shows, the change to enable parallel support is extremely small! All that it uses is a .NET Framework 4.0–introduced standard query operator, AsParallel(), which can be found on the static class System.Linq.ParallelEnumerable. This simple extension method tells the runtime that it can execute the query in parallel. The result is that on machines with multiple available CPUs, the total time taken to execute the query can be significantly shorter.

System.Linq.ParallelEnumerable includes a superset of the query operators available on System.Linq.Enumerable, resulting in possible performance improvements for all of the common query operators, including those used for sorting, filtering (Where()), projecting (Select()), joining, grouping, and aggregating. Listing 18.31 shows how to do a parallel sort.

LISTING 18.31: Parallel LINQ with Standard Query Operators


// ...
OrderedParallelQuery<string> parallelGroups =
data.AsParallel().OrderBy(item => item);

// Show the total count of items still
// matches the original count
System.Diagnostics.Trace.Assert(
data.Count == parallelGroups.Sum(
item => item.Count()));
// ...


As Listing 18.31 shows, invoking the parallel version simply involves a call to the AsParallel() extension method. Notice that the type of the result returned by the parallel standard query operators is either ParallelQuery<T> or OrderedParallelQuery<T>; both inform the compiler that it should continue to use the parallel versions of the standard query operations that are available.

Given that query expressions are simply a syntactic sugar for the method call form of the query used in Listings 18.30 and 18.31, you can just as easily use AsParallel() with the expression form. Listing 18.32 shows an example of executing a grouping operation in parallel using query expression syntax.

LISTING 18.32: Parallel LINQ with Query Expressions


// ...
ParallelQuery<IGrouping<char, string>> parallelGroups;
parallelGroups =
from text in data.AsParallel()
orderby text
group text by text[0];

// Show the total count of items still
// matches the original count
System.Diagnostics.Trace.Assert(
data.Count == parallelGroups.Sum(
item => item.Count()));
// ...


As you saw in the previous examples, converting a query or iteration loop to execute in parallel is simple. There is one significant caveat, however: As we will discuss in depth in Chapter 19, you must take care not to allow multiple threads to inappropriately access and modify the same memory simultaneously. Doing so will cause a race condition.

As we saw earlier in this chapter, the Parallel.For() and Parallel.ForEach<T> 4() methods will gather up any exceptions thrown during the parallel iterations and then throw one aggregating exception containing all of the original exceptions. PLINQ operations are no different. That is, they also have the potential of returning multiple exceptions for the exact same reason: When the query logic is run on each element in parallel, the code executing on each element can independently throw an exception. Unsurprisingly, PLINQ deals with this situation in exactly the same way as do parallel loops and the TPL: Exceptions thrown during parallel queries are accessible via the InnerExceptions property of the AggregateException. Therefore, wrapping a PLINQ query in a try/catch block with the exception type ofSystem.AggregateException will successfully handle any exceptions within each iteration that were unhandled.

Canceling a PLINQ Query

As expected, the cancellation request pattern is also available on PLINQ queries. Listing 18.33 (with Output 18.10) provides an example. Like the parallel loops, canceled PLINQ queries will throw a System.OperationCanceledException. Also like the parallel loops, executing a PLINQ query is a synchronous operation on the invoking thread. Thus, a common technique is to wrap the parallel query in a task that runs on another thread so that the current thread can cancel it if necessary—the same solution used in Listing 18.28.

LISTING 18.33: Canceling a PLINQ Query


using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class Program
{

public static List<string> ParallelEncrypt(
List<string> data,
CancellationToken cancellationToken)
{
return data.AsParallel().WithCancellation(
cancellationToken).Select(
(item) => Encrypt(item)).ToList();
}

public static void Main()
{
ConsoleColor originalColor = Console.ForegroundColor;
List<string> data = Utility.GetData(100000).ToList();

CancellationTokenSource cts =
new CancellationTokenSource();

Console.WriteLine("Push ENTER to Exit.");

// Use Task.Factory.StartNew<string>() for
// TPL prior to .NET 4.5
Task task = Task.Run(() =>
{
data = ParallelEncrypt(data, cts.Token);
}, cts.Token);

// Wait for the user's input
Console.Read();

if (!task.IsCompleted)
{
cts.Cancel();
try { task.Wait(); }
catch (AggregateException exception)
{
Console.ForegroundColor = ConsoleColor.Red;
TaskCanceledException taskCanceledException =
(TaskCanceledException)exception.Flatten()
.InnerExceptions
.FirstOrDefault(
innerException =>
innerException.GetType() ==
typeof(TaskCanceledException));
if(taskCanceledException != null){
Console.WriteLine($@"Cancelled: {
taskCanceledException.Message }");
}
else
{
// ...
}
}
}
else
{
task.Wait();
Console.ForegroundColor = ConsoleColor.Green;
Console.Write("Completed successfully");
}
Console.ForegroundColor = originalColor;
}
}


OUTPUT 18.10

Cancelled: A task was canceled.

As with a parallel loop or task, canceling a PLINQ query requires a CancellationToken, which is available from a CancellationTokenSource. However, rather than overloading every PLINQ query to support the cancellation token, the ParallelQuery<T> object returned by IEnumerable’s AsParallel() method includes a WithCancellation() extension method that simply takes a CancellationToken. As a result, calling Cancel() on the CancellationTokenSource object will request the parallel query to cancel—because it checks the IsCancellationRequested property on the CancellationToken.

As mentioned, canceling a PLINQ query will throw an exception in place of returning the complete result. One common technique for dealing with a possibly canceled PLINQ query is to wrap the query in a try block and catch the OperationCanceledException. A second common technique, used in Listing 18.33, is to pass the CancellationToken both to ParallelEncrypt() and as a second parameter on Run(). This will cause task.Wait() to throw an AggregateException whose InnerException property will be set to aTaskCanceledException. The aggregating exception can then be caught, just as you would catch any other exception from a parallel operation.

Summary

In this chapter, we started by examining the basic parts of multithreaded programs: the Thread class, which represents an independent “point of control” in a program, and the ThreadPool, which encourages efficient allocation and scheduling of threads to multiple CPUs. However, these APIs are low-level entities that are difficult to work with directly. Starting with Version 4.0, the .NET Framework provides the Parallel Extensions library, which includes the Task Parallel Library (TPL) and Parallel LINQ (PLINQ). Both provide new APIs for creating and scheduling units of work represented by Task objects, executing loops in parallel using Parallel.For() and Parallel.ForEach(), and automatically parallelizing LINQ queries with AsParallel().

We also discussed how C# 5.0 makes programming complex workflows with Task objects much easier by automatically rewriting your programs to manage the continuation “wiring” that composes larger tasks out of smaller tasks.

At the beginning of this chapter, we briefly glossed over some of the difficult problems that developers often face when writing multithreaded programs: atomicity problems, deadlocks, and other “race conditions” that introduce uncertainty and bad behavior into multithreaded programs. The standard way to avoid these problems is to carefully write code that uses “locks” to synchronize access to shared resources; this is the topic of the next chapter.

End 4.0