Collections - Concurrency in C# Cookbook (2014)

Concurrency in C# Cookbook (2014)

Chapter 8. Collections

Using the proper collections is essential in concurrent applications. I’m not talking about the standard collections like List<T>; I assume you already know about those. The purpose of this chapter is to introduce newer collections that are specifically intended for concurrent or asynchronous use.

Immutable collections are collection instances that can never change. At first glance, this sounds completely useless; but they’re actually very useful even in single-threaded, nonconcurrent applications. Read-only operations (such as enumeration) act directly on the immutable instance. Write operations (such as adding an item) return a new immutable instance instead of changing the existing instance. This is not as wasteful as it first sounds because most of the time immutable collections share most of their memory. Furthermore, immutable collections have the advantage of being implicitly safe to access from multiple threads; since they cannot change, they are threadsafe.

TIP

Immutable collections are in the Microsoft.Bcl.Immutable NuGet package.

At the time of this writing, immutable collections are new, but they should be considered for all new development unless you need a mutable instance. If you’re not familiar with immutable collections, I recommend that you start with Recipe 8.1, even if you don’t need a stack or queue, because I’ll cover several common patterns that all immutable collections follow.

If you need to construct an immutable collection with lots of existing elements, there are special ways to do this efficiently; the example code in these recipes only add elements one at a time. The MSDN documentation has details on how to efficiently construct immutable collections if you need to speed up your initialization. Table 8-1 details the platform availability of immutable collections.

Table 8-1. Immutable collections platform availability

Platform

ImmutableStack<T>, etc.

.NET 4.5

Yes

.NET 4.0

No

Mono iOS/Droid

Yes

Windows Store

Yes

Windows Phone Apps 8.1

Yes

Windows Phone SL 8.0

Yes

Windows Phone SL 7.1

No

Silverlight 5

No

Threadsafe collections are mutable collection instances that can be changed by multiple threads simultaneously. Threadsafe collections use a mixture of fine-grained locks and lock-free techniques to ensure that threads are blocked for a minimal amount of time (and usually are not blocked at all). For many threadsafe collections, enumeration of the collection actually creates a snapshot of the collection and then enumerates that snapshot. The key advantage of threadsafe collections is that they can be accessed safely from multiple threads, yet the operations will only block your code for a short time, if at all. Table 8-2 details the platform availability of threadsafe collections.

Table 8-2. Threadsafe collections platform availability

Platform

ConcurrentDictionary<TKey, TValue>, etc.

.NET 4.5

Yes

.NET 4.0

Yes

Mono iOS/Droid

Yes

Windows Store

Yes

Windows Phone Apps 8.1

Yes

Windows Phone SL 8.0

No

Windows Phone SL 7.1

No

Silverlight 5

No

Producer/consumer collections are mutable collection instances that are designed with a specific purpose in mind: to allow (possibly multiple) producers to push items to the collection while allowing (possibly multiple) consumers to pull items out of the collection. So they act as a bridge between producer code and consumer code, and also have an option to limit the number of items in the collection. Producer/consumer collections can either have a blocking or asynchronous API. For example, when the collection is empty, a blocking producer/consumer collection will block the calling consumer thread until another item is added; but an asynchronous producer/consumer collection will allow the calling consumer thread to asynchronously wait until another item is added. Table 8-3 details the platform availability of producer/consumer collections.

TIP

AsyncProducerConsumerQueue<T> and AsyncCollection<T> are in the Nito.AsyncEx NuGet package. BufferBlock<T> is in the Microsoft.Tpl.Dataflow NuGet package.

Table 8-3. Producer/consumer collections platform availability

Platform

BlockingCollection<T>

BufferBlock<T>

AsyncProducerConsumerQueue<T>

AsyncCollection<T>

.NET 4.5

Yes

Yes

Yes

Yes

.NET 4.0

Yes

No

Yes

Yes

Mono iOS/Droid

Yes

Yes

Yes

Yes

Windows Store

Yes

Yes

Yes

Yes

Windows Phone Apps 8.1

Yes

Yes

Yes

Yes

Windows Phone SL 8.0

No

Yes

Yes

No

Windows Phone SL 7.1

No

No

Yes

No

Silverlight 5

No

No

Yes

No

There are a number of different producer/consumer collections used in the recipes in this chapter, and different producer/consumer collections have different advantages. Table 8-4 may be helpful in determining which one you should use.

Table 8-4. Producer/consumer collections

Feature

BlockingCollection<T>

BufferBlock<T>

AsyncProducerConsumerQueue<T>

AsyncCollection<T>

Queue semantics

Yes

Yes

Yes

Yes

Stack/bag semantics

Yes

No

No

Yes

Synchronous API

Yes

Yes

Yes

Yes

Asynchronous API

No

Yes

Yes

Yes

Mobile platform support

Partial

Partial

Yes

Partial

Tested by Microsoft

Yes

Yes

No

No

8.1. Immutable Stacks and Queues

Problem

You need a stack or queue that does not change very often and can be accessed by multiple threads safely.

For example, a queue can be used as a sequence of operations to perform, and a stack can be used as a sequence of undo operations.

Solution

Immutable stacks and queues are the simplest immutable collections. They behave very similarly to the standard Stack<T> and Queue<T>. Performance-wise, immutable stacks and queues have the same time complexity as standard stacks and queues; however, in simple scenarios where the collections are updated frequently, the standard stack and queue are faster.

Stacks are a first-in, first-out data structure. The following code creates an empty immutable stack, pushes two items, enumerates the items, and then pops an item:

var stack = ImmutableStack<int>.Empty;

stack = stack.Push(13);

stack = stack.Push(7);

// Displays "7" followed by "13".

foreach (var item in stack)

Trace.WriteLine(item);

int lastItem;

stack = stack.Pop(out lastItem);

// lastItem == 7

Note that we keep overwriting the local variable stack in the preceding example. Immutable collections follow a pattern where they return an updated collection; the original collection reference is unchanged. This means that once you have a reference to a particular immutable collection instance, it will never change; consider the following example:

var stack = ImmutableStack<int>.Empty;

stack = stack.Push(13);

var biggerStack = stack.Push(7);

// Displays "7" followed by "13".

foreach (var item in biggerStack)

Trace.WriteLine(item);

// Only displays "13".

foreach (var item in stack)

Trace.WriteLine(item);

Under the covers, the two stacks are actually sharing the memory used to contain the item 13. This kind of implementation is very efficient while also allowing you to easily snapshot the current state. Each immutable collection instance is naturally threadsafe, but this can also be used in single-threaded applications. In my experience, immutable collections are especially useful when the code is more functional or when you need to store a large number of snapshots and want them to share memory as much as possible.

Queues are similar to stacks, except they are a first-in, last-out data structure. The following code creates an empty immutable queue, enqueues two items, enumerates the items, and then dequeues an item:

var queue = ImmutableQueue<int>.Empty;

queue = queue.Enqueue(13);

queue = queue.Enqueue(7);

// Displays "13" followed by "7".

foreach (var item in queue)

Trace.WriteLine(item);

int nextItem;

queue = queue.Dequeue(out nextItem);

// Displays "13"

Trace.WriteLine(nextItem);

Discussion

This recipe introduced the two simplest immutable collections, the stack and the queue. However, we covered several important design philosophies that are true for all immutable collections:

§ An instance of an immutable collection never changes.

§ Since it never changes, it is naturally threadsafe.

§ When you call a modifying method on an immutable collection, the modified collection is returned.

Immutable collections are ideal for sharing state. However, they don’t work as well as communication conduits. In particular, don’t use an immutable queue to communicate between threads; producer/consumer queues work much better for that.

TIP

ImmutableStack<T> and ImmutableQueue<T> are in the Microsoft.Bcl.Immutable NuGet package.

See Also

Recipe 8.6 covers threadsafe (blocking) mutable queues.

Recipe 8.7 covers threadsafe (blocking) mutable stacks.

Recipe 8.8 covers async-compatible mutable queues.

Recipe 8.9 covers async-compatible mutable stacks.

Recipe 8.10 covers blocking/asynchronous mutable queues.

8.2. Immutable Lists

Problem

You need a data structure you can index into that does not change very often and can be accessed by multiple threads safely.

A list is a general-purpose data structure that can be used for all kinds of application state.

Solution

Immutable lists do allow indexing, but you need to be aware of the performance characteristics. They’re not just a drop-in replacement for List<T>.

ImmutableList<T> does support similar methods as List<T>, as this example shows:

var list = ImmutableList<int>.Empty;

list = list.Insert(0, 13);

list = list.Insert(0, 7);

// Displays "7" followed by "13".

foreach (var item in list)

Trace.WriteLine(item);

list = list.RemoveAt(1);

However, the immutable list is internally organized as a binary tree. This is done so that immutable list instances may maximize the amount of memory they share with other instances. As a result, there are performance differences between ImmutableList<T> and List<T> for some common operations (Table 8-5).

Table 8-5. Performance difference of immutable lists

Operation

List<T>

ImmutableList<T>

Add

amortized O(1)

O(log N)

Insert

O(N)

O(log N)

RemoveAt

O(N)

O(log N)

Item[index]

O(1)

O(log N)

Of particular note, the indexing operation for ImmutableList<T> is O(log N), not O(1) as you may expect. If you are replacing List<T> with ImmutableList<T> in existing code, you’ll need to consider how your algorithms access items in the collection.

This means that you should use foreach instead of for whenever possible. A foreach loop over an ImmutableList<T> executes in O(N) time, while a for loop over the same collection executes in O(N * log N) time:

// The best way to iterate over an ImmutableList<T>

foreach (var item in list)

Trace.WriteLine(item);

// This will also work, but it will be much slower.

for (int i = 0; i != list.Count; ++i)

Trace.WriteLine(list[i]);

Discussion

ImmutableList<T> is a good general-purpose data structure, but because of its performance differences, you can’t blindly replace all your List<T> uses with it. List<T> is commonly used by default—that is, it’s the one you use unless you need a different collection.ImmutableList<T> isn’t quite that ubiquitous; you’ll need to consider the other immutable collections carefully and choose the one that makes the most sense for your situation.

TIP

ImmutableList<T> is in the Microsoft.Bcl.Immutable NuGet package.

See Also

Recipe 8.1 covers immutable stacks and queues, which are like lists that only allow certain elements to be accessed.

MSDN documentation on ImmutableList<T>.Builder, an efficient way to populate an immutable list.

8.3. Immutable Sets

Problem

You need a data structure that does not need to store duplicates, does not change very often, and can be accessed by multiple threads safely.

For example, an index of words from a file would be a good use case for a set.

Solution

There are two immutable set types: ImmutableHashSet<T> is just a collection of unique items, and ImmutableSortedSet<T> is a sorted collection of unique items. Both types of sets have a similar interface:

var hashSet = ImmutableHashSet<int>.Empty;

hashSet = hashSet.Add(13);

hashSet = hashSet.Add(7);

// Displays "7" and "13" in an unpredictable order.

foreach (var item in hashSet)

Trace.WriteLine(item);

hashSet = hashSet.Remove(7);

Only the sorted set allows indexing into it like a list:

var sortedSet = ImmutableSortedSet<int>.Empty;

sortedSet = sortedSet.Add(13);

sortedSet = sortedSet.Add(7);

// Displays "7" followed by "13".

foreach (var item in hashSet)

Trace.WriteLine(item);

var smallestItem = sortedSet[0];

// smallestItem == 7

sortedSet = sortedSet.Remove(7);

Unsorted sets and sorted sets have similar performance (see Table 8-6).

Table 8-6. Performance of immutable sets

Operation

ImmutableHashSet<T>

ImmutableSortedSet<T>

Add

O(log N)

O(log N)

Remove

O(log N)

O(log N)

Item[index]

n/a

O(log N)

However, I recommend you use an unsorted set unless you know it needs to be sorted. Many types only support basic equality and not full comparison, so an unsorted set can be used for many more types than a sorted set.

One important note about the sorted set is that its indexing is O(log N), not O(1), just like ImmutableList<T>, which we looked at in Recipe 8.2. This means the same caveat applies: you should use foreach instead of for whenever possible with an ImmutableSortedSet<T>.

Discussion

Immutable sets are useful data structures, but populating a large immutable set can be slow. Most immutable collections have special builders that can be used to construct them quickly in a mutable way and then convert them into an immutable collection. This is true for many immutable collections, but I’ve found them most useful for immutable sets.

TIP

ImmutableHashSet<T> and ImmutableSortedSet<T> are in the Microsoft.Bcl.Immutable NuGet package.

See Also

Recipe 8.7 covers threadsafe mutable bags, which are similar to sets.

Recipe 8.9 covers async-compatible mutable bags.

MSDN documentation on ImmutableHashSet<T>.Builder, an efficient way to populate an immutable hash set.

MSDN documentation on ImmutableSortedSet<T>.Builder, an efficient way to populate an immutable sorted set.

8.4. Immutable Dictionaries

Problem

You need a key/value collection that does not change very often and can be accessed by multiple threads safely.

For example, you may want to store reference data in a lookup collection; the reference data rarely changes but should be available to different threads.

Solution

There are two immutable dictionary types: ImmutableDictionary<TKey, TValue> and ImmutableSortedDictionary<TKey, TValue>. As you may be able to guess, ImmutableSortedDictionary ensures that its elements are sorted, while the items inImmutableDictionary have an unpredictable order.

Both of these collection types have very similar members:

var dictionary = ImmutableDictionary<int, string>.Empty;

dictionary = dictionary.Add(10, "Ten");

dictionary = dictionary.Add(21, "Twenty-One");

dictionary = dictionary.SetItem(10, "Diez");

// Displays "10Diez" and "21Twenty-One" in an unpredictable order.

foreach (var item in dictionary)

Trace.WriteLine(item.Key + item.Value);

var ten = dictionary[10];

// ten == "Diez"

dictionary = dictionary.Remove(21);

Note the use of SetItem. In a mutable dictionary, you could do something like dictionary[key] = item, but immutable dictionaries must return the updated immutable dictionary, so they use the SetItem method instead:

var sortedDictionary = ImmutableSortedDictionary<int, string>.Empty;

sortedDictionary = sortedDictionary.Add(10, "Ten");

sortedDictionary = sortedDictionary.Add(21, "Twenty-One");

sortedDictionary = sortedDictionary.SetItem(10, "Diez");

// Displays "10Diez" followed by "21Twenty-One".

foreach (var item in sortedDictionary)

Trace.WriteLine(item.Key + item.Value);

var ten = sortedDictionary[10];

// ten == "Diez"

sortedDictionary = sortedDictionary.Remove(21);

Unsorted dictionaries and sorted dictionaries have similar performance, but I recommend you use an unordered dictionary unless you need your elements to be sorted (see Table 8-7). Unsorted dictionaries can be a little faster overall. Furthermore, unsorted dictionaries can be used with any key types, while sorted dictionaries require their key types to be fully comparable.

Table 8-7. Performance of immutable dictionaries

Operation

ImmutableDictionary<TK,TV>

ImmutableSortedDictionary<TK,TV>

Add

O(log N)

O(log N)

SetItem

O(log N)

O(log N)

Item[key]

O(log N)

O(log N)

Remove

O(log N)

O(log N)

Discussion

In my experience, dictionaries are a common and useful tool when dealing with application state. They can be used in any kind of key/value or lookup scenario.

Like other immutable collections, immutable dictionaries have a builder mechanism for efficient construction if the dictionary contains many elements. For example, if you load your initial reference data at startup, you should use the builder mechanism to construct the initial immutable dictionary. On the other hand, if your reference data is gradually built up during your application’s execution, then using the regular immutable dictionary Add method is likely acceptable.

TIP

ImmutableDictionary<TK, TV> and ImmutableSortedDicationary<TK, TV> are in the Microsoft.Bcl.Immutable NuGet package.

See Also

Recipe 8.5 covers threadsafe mutable dictionaries.

MSDN documentation on ImmutableDictionary<TK,TV>.Builder, an efficient way to populate an immutable dictionary.

MSDN documentation on ImmutableSortedDictionary<TK,TV>.Builder, an efficient way to populate an immutable sorted dictionary.

8.5. Threadsafe Dictionaries

Problem

You have a key/value collection that you need to keep in sync, even though multiple threads are both reading from and writing to it.

For example, consider a simple in-memory cache.

Solution

The ConcurrentDictionary<TKey, TValue> type in the .NET framework is a true gem of data structures. It is threadsafe, using a mixture of fine-grained locks and lock-free techniques to ensure fast access in the vast majority of scenarios.

Its API does take a bit of getting used to. It is not at all like the standard Dictionary<TKey, TValue> type, since it must deal with concurrent access from multiple threads. However, once you learn the basics in this recipe, you’ll find ConcurrentDictionary<TKey, TValue> to be one of the most useful collection types.

First, let’s cover writing a value to the collection. To set the value of a key, you can use AddOrUpdate as such:

var dictionary = new ConcurrentDictionary<int, string>();

var newValue = dictionary.AddOrUpdate(0,

key => "Zero",

(key, oldValue) => "Zero");

AddOrUpdate is a bit complex because it must do several things, depending on the current contents of the concurrent dictionary. The first method argument is the key. The second argument is a delegate that transforms the key (in this case, 0) into a value to be added to the dictionary (in this case, "Zero"). This delegate is only invoked if the key does not exist in the dictionary. The third argument is another delegate that transforms the key (0) and the old value into an updated value to be stored in the dictionary ("Zero"). This delegate is only invoked if the key does exist in the dictionary. AddOrUpdate returns the new value for that key (the same value that was returned by one of the delegates).

Now for the part that really bends your brain: in order for the concurrent dictionary to work properly, AddOrUpdate might have to invoke either (or both) delegates multiple times. This is very rare, but it can happen. So your delegates should be simple and fast and not cause side effects. This means that your delegate should only create the value; it should not change any other variables in your application. The same principle holds for all delegates you pass to methods on ConcurrentDictionary<TKey, TValue>.

That was the hard part because it had to deal with all the thread safety concerns. The rest of the API is easier.

In fact, there are several other ways to add values to a dictionary. One shortcut is to just use indexing syntax:

// Using the same "dictionary" as above.

// Adds (or updates) key 0 to have the value "Zero".

dictionary[0] = "Zero";

Indexing syntax is less powerful; it does not give you the ability to update a value based on the existing value. However, the syntax is simpler and works fine if you already have the value you want to store in the dictionary.

Let’s look at how to read a value. This can be easily done via TryGetValue:

// Using the same "dictionary" as above.

string currentValue;

bool keyExists = dictionary.TryGetValue(0, out currentValue);

TryGetValue will return true and set the out value if the key was found in the dictionary. If the key was not found, TryGetValue will return false. You can also use indexing syntax to read values, but I find that much less useful because it will throw an exception if a key is not found. Keep in mind that a concurrent dictionary has multiple threads reading, updating, adding, and removing values; in many situations, it’s difficult to know whether a key exists or not until you attempt to read it.

Removing values is just as easy as reading them:

// Using the same "dictionary" as above.

string removedValue;

bool keyExisted = dictionary.TryRemove(0, out removedValue);

TryRemove is almost identical to TryGetValue, except (of course) it removes the key/value pair if the key was found in the dictionary.

Discussion

I think ConcurrentDictionary<TKey, TValue> is awesome, mainly because of the incredibly powerful AddOrUpdate method. However, it doesn’t fit the bill in every situation. ConcurrentDictionary<TKey, TValue> is best when you have multiple threads reading and writing to a shared collection. If the updates are not constant (if they’re more rare), than ImmutableDictionary<TKey, TValue> may be a better choice.

ConcurrentDictionary<TKey, TValue> is best in a shared-data situation, where multiple threads share the same collection. If some threads only add elements and other threads only remove elements, you’d be better served by a producer/consumer collection.

ConcurrentDictionary<TKey, TValue> is not the only threadsafe collection. The BCL also provides ConcurrentStack<T>, ConcurrentQueue<T>, and ConcurrentBag<T>. However, those threadsafe collections are seldomly used by themselves; they are usually only used in the implementation of producer/consumer collections, which we will cover in the rest of this chapter.

See Also

Recipe 8.4 covers immutable dictionaries, which are ideal if the contents of the dictionary change very rarely.

8.6. Blocking Queues

Problem

You need a conduit to pass messages or data from one thread to another. For example, one thread could be loading data, which it pushes down the conduit as it loads; meanwhile, there are other threads on the receiving end of the conduit that receive the data and process it.

Solution

The .NET type BlockingCollection<T> was designed to be this kind of conduit. By default, BlockingCollection<T> is a blocking queue, providing first-in, first-out behavior.

A blocking queue needs to be shared by multiple threads, so it is usually defined as a private, read-only field:

private readonly BlockingCollection<int> _blockingQueue =

new BlockingCollection<int>();

Usually, a thread will either add items to the collection or remove items from the collection, but not both. Threads that add items are called producer threads, and threads that remove items are called consumer threads.

Producer threads can add items by calling Add, and when the producer thread is finished (i.e., all items have been added), it can finish the collection by calling CompleteAdding. This notifies the collection that no more items will be added to it, and the collection can then inform its consumers that there are no more items.

Here’s a simple example of a producer that adds two items and then marks the collection complete:

_blockingQueue.Add(7);

_blockingQueue.Add(13);

_blockingQueue.CompleteAdding();

Consumer threads usually run in a loop, waiting for the next item and then processing it. If you take the producer code and put it in a separate thread (e.g., via Task.Run), then you can consume those items like this:

// Displays "7" followed by "13".

foreach (var item in _blockingQueue.GetConsumingEnumerable())

Trace.WriteLine(item);

If you want to have multiple consumers, GetConsumingEnumerable can be called from multiple threads at the same time. However, each item is only passed to one of those threads. When the collection is completed, the enumerable completes.

When you use conduits like this, you do need to consider what happens if your producers run faster than your consumers, unless you are sure that your consumers will always run faster. If you’re producing items faster than you can consume them, then you may need to throttle your queue.BlockingCollection<T> makes this easy; you can throttle the number of items by passing the appropriate value when you create it. This simple example limits the collection to a single item:

BlockingCollection<int> _blockingQueue = new BlockingCollection<int>(

boundedCapacity: 1);

Now, the same producer code will behave differently, as noted by the comments:

// This Add completes immediately.

_blockingQueue.Add(7);

// This Add waits for the 7 to be removed before it adds the 13.

_blockingQueue.Add(13);

_blockingQueue.CompleteAdding();

Discussion

The preceding examples all use GetConsumingEnumerable for the consumer threads; this is the most common scenario. However, there is also a Take member that allows a consumer to just consume a single item rather than run a loop consuming all the items.

Blocking queues are great when you have a separate thread (such as a thread-pool thread) acting as a producer or consumer. They’re not as great when you want to access the conduit asynchronously—for example, if a UI thread wants to act as a consumer. We’ll look at asynchronous queues in Recipe 8.8.

Whenever you introduce a conduit like this into your application, consider switching to the TPL Dataflow library. A lot of the time, using TPL Dataflow is simpler than building your own conduits and background threads. In particular, BufferBlock<T> can act like a blocking queue. However, TPL Dataflow is not available on all platforms, so in some cases, blocking queues are the appropriate design choice.

If you need maximum cross-platform support, you could also use AsyncProducerConsumerQueue<T> from the AsyncEx library, which can act like a blocking queue. Table 8-8 outlines the platform support for blocking queues.

Table 8-8. Platform support for blocking queues

Platform

BlockingCollection<T>

BufferBlock<T>

AsyncProducerConsumerQueue<T>

.NET 4.5

Yes

Yes

Yes

.NET 4.0

Yes

No

Yes

Mono iOS/Droid

Yes

Yes

Yes

Windows Store

Yes

Yes

Yes

Windows Phone Apps 8.1

Yes

Yes

Yes

Windows Phone SL 8.0

No

Yes

Yes

Windows Phone SL 7.1

No

No

Yes

Silverlight 5

No

No

Yes

See Also

Recipe 8.7 covers blocking stacks and bags if you want a similar conduit without first-in-first-out semantics.

Recipe 8.8 covers queues that have asynchronous rather than blocking APIs.

Recipe 8.10 covers queues that have both asynchronous and blocking APIs.

8.7. Blocking Stacks and Bags

Problem

You need a conduit to pass messages or data from one thread to another, but you don’t want (or need) the conduit to have first-in, first-out semantics.

Solution

The .NET type BlockingCollection<T> acts as a blocking queue by default, but it can also act like any kind of producer/consumer collection. BlockingCollection<T> is actually a wrapper around a threadsafe collection that implements IProducerConsumerCollection<T>.

So, you can create a BlockingCollection<T> with last-in-first-out (stack) semantics or unordered (bag) semantics as such:

BlockingCollection<int> _blockingStack = new BlockingCollection<int>(

new ConcurrentStack<int>());

BlockingCollection<int> _blockingBag = new BlockingCollection<int>(

new ConcurrentBag<int>());

It’s important to keep in mind that there are now race conditions around the ordering of the items. If we let the same producer code execute before any consumer code, and then execute the consumer code after the producer code, then the order of the items will be exactly like a stack:

// Producer code

_blockingStack.Add(7);

_blockingStack.Add(13);

_blockingStack.CompleteAdding();

// Consumer code

// Displays "13" followed by "7".

foreach (var item in _blockingStack.GetConsumingEnumerable())

Trace.WriteLine(item);

However, when the producer code and consumer code are on different threads (which is the usual case), the consumer will always get the most recently added item next. For example, the producer could add 7, the consumer could take 7, the producer could add 13, and the consumer could take 13. The consumer does not wait for CompleteAdding to be called before it returns the first item.

Discussion

The same considerations around throttling that apply to blocking queues also apply to blocking stacks and bags. If your producers run faster than your consumers and you need to limit the memory usage of your blocking stack/bag, you can use throttling exactly like we discussed inRecipe 8.6.

This recipe uses GetConsumingEnumerable for the consumer code; this is the most common scenario. However, there is also a Take member that allows a consumer to just consume a single item rather than run a loop consuming all the items.

If you want to access shared stacks or bags asynchronously rather than by blocking (for example, having your UI thread act as a consumer), see Recipe 8.9.

See Also

Recipe 8.6 covers blocking queues, which are much more commonly used than blocking stacks or bags.

Recipe 8.9 covers asynchronous stacks and bags.

8.8. Asynchronous Queues

Problem

You need a conduit to pass messages or data from one part of code to another in a first-in, first-out manner.

For example, one piece of code could be loading data, which it pushes down the conduit as it loads; meanwhile, the UI thread is receiving the data and displaying it.

Solution

What you need is a queue with an asynchronous API. There is no type like this in the core .NET framework, but there are a couple of options available from NuGet.

The first option is to use BufferBlock<T> from the TPL Dataflow library. The following simple example shows how to declare a BufferBlock<T>, what the producer code looks like, and what the consumer code looks like:

BufferBlock<int> _asyncQueue = new BufferBlock<int>();

// Producer code

await _asyncQueue.SendAsync(7);

await _asyncQueue.SendAsync(13);

_asyncQueue.Complete();

// Consumer code

// Displays "7" followed by "13".

while (await _asyncQueue.OutputAvailableAsync())

Trace.WriteLine(await _asyncQueue.ReceiveAsync());

BufferBlock<T> also has built-in support for throttling. For full details, see Recipe 8.10.

The example consumer code uses OutputAvailbleAsync, which is really only useful if you have a single consumer. If you have multiple consumers, it is possible that OutputAvailbleAsync will return true for more than one consumer even though there is only one item. If the queue is completed, then DequeueAsync will throw InvalidOperationException. So if you have multiple consumers, the consumer code usually looks more like this:

while (true)

{

int item;

try

{

item = await _asyncQueue.ReceiveAsync();

}

catch (InvalidOperationException)

{

break;

}

Trace.WriteLine(item);

}

If TPL Dataflow is available on your platform, I recommend the BufferBlock<T> solution. Unfortunately, TPL Dataflow is not available everywhere. If BufferBlock<T> isn’t available, you can use the AsyncProducerConsumerQueue<T> type from the Nito.AsyncEx NuGet library. The API is similar to but not exactly the same as BufferBlock<T>:

AsyncProducerConsumerQueue<int> _asyncQueue

= new AsyncProducerConsumerQueue<int>();

// Producer code

await _asyncQueue.EnqueueAsync(7);

await _asyncQueue.EnqueueAsync(13);

await _asyncQueue.CompleteAdding();

// Consumer code

// Displays "7" followed by "13".

while (await _asyncQueue.OutputAvailableAsync())

Trace.WriteLine(await _asyncQueue.DequeueAsync());

AsyncProducerConsumerQueue<T> has support for throttling, which is necessary if your producers may run faster than your consumers. Just construct the queue with the appropriate value:

AsyncProducerConsumerQueue<int> _asyncQueue

= new AsyncProducerConsumerQueue<int>(maxCount: 1);

Now, the same producer code will asynchronously wait appropriately:

// This Enqueue completes immediately.

await _asyncQueue.EnqueueAsync(7);

// This Enqueue (asynchronously) waits for the 7 to be removed

// before it enqueues the 13.

await _asyncQueue.EnqueueAsync(13);

await _asyncQueue.CompleteAdding();

This consumer code also uses OutputAvailableAsync, and has the same problems as BufferBlock<T>. The AsyncProducerConsumerQueue<T> type provides a TryDequeueAsync member that helps avoid cumbersome consumer code. If you have multiple consumers, the consumer code usually looks more like this:

while (true)

{

var dequeueResult = await _asyncQueue.TryDequeueAsync();

if (!dequeueResult.Success)

break;

Trace.WriteLine(dequeueResult.Item);

}

Discussion

I do recommend that you use BufferBlock<T> over AsyncProducerConsumerQueue<T>, simply because BufferBlock<T> has been much more thoroughly tested. However, BufferBlock<T> is not available on many platforms, most notably older platforms (see Table 8-9).

TIP

The BufferBlock<T> type is in the Microsoft.Tpl.Dataflow NuGet package. The AsyncProducerConsumerQueue<T> type is in the Nito.AsyncEx NuGet package.

Table 8-9. Platform support for asynchronous queues

Platform

BufferBlock<T>

AsyncProducerConsumerQueue<T>

.NET 4.5

Yes

Yes

.NET 4.0

No

Yes

Mono iOS/Droid

Yes

Yes

Windows Store

Yes

Yes

Windows Phone Apps 8.1

Yes

Yes

Windows Phone SL 8.0

Yes

Yes

Windows Phone SL 7.1

No

Yes

Silverlight 5

No

Yes

See Also

Recipe 8.6 covers producer/consumer queues with blocking semantics rather than asynchronous semantics.

Recipe 8.10 covers producer/consumer queues that have both blocking and asynchronous semantics.

Recipe 8.7 covers asynchronous stacks and bags if you want a similar conduit without first-in, first-out semantics.

8.9. Asynchronous Stacks and Bags

Problem

You need a conduit to pass messages or data from one part of code to another, but you don’t want (or need) the conduit to have first-in, first-out semantics.

Solution

The Nito.AsyncEx library provides a type AsyncCollection<T>, which acts like an asynchronous queue by default, but it can also act like any kind of producer/consumer collection. AsyncCollection<T> is a wrapper around an IProducerConsumerCollection<T>.AsyncCollection<T> is the async equivalent of the .NET BlockingCollection<T> that we saw in Recipe 8.7.

AsyncCollection<T> supports last-in, first-out (stack) or unordered (bag) semantics, based on whatever collection you pass to its constructor:

AsyncCollection<int> _asyncStack = new AsyncCollection<int>(

new ConcurrentStack<int>());

AsyncCollection<int> _asyncBag = new AsyncCollection<int>(

new ConcurrentBag<int>());

Note that there is a race condition around the ordering of items in the stack. If all producers complete before consumers start, then the order of items is like a regular stack:

// Producer code

await _asyncStack.AddAsync(7);

await _asyncStack.AddAsync(13);

await _asyncStack.CompleteAddingAsync();

// Consumer code

// Displays "13" followed by "7".

while (await _asyncStack.OutputAvailableAsync())

Trace.WriteLine(await _asyncStack.TakeAsync());

However, when both producers and consumers are executing concurrently (which is the usual case), the consumer will always get the most recently added item next. This will cause the collection as a whole to act not quite like a stack. Of course, the bag collection has no ordering at all.

AsyncCollection<T> has support for throttling, which is necessary if producers may add to the collection faster than the consumers can remove from it. Just construct the collection with the appropriate value:

AsyncCollection<int> _asyncStack = new AsyncCollection<int>(

new ConcurrentStack<int>(), maxCount: 1);

Now the same producer code will asynchronously wait as needed:

// This Add completes immediately.

await _asyncStack.AddAsync(7);

// This Add (asynchronously) waits for the 7 to be removed

// before it enqueues the 13.

await _asyncStack.AddAsync(13);

await _asyncStack.CompleteAddingAsync();

The example consumer code uses OutputAvailbleAsync, which has the same limitation as described in Recipe 8.8. If you have multiple consumers, the consumer code usually looks more like this:

while (true)

{

var takeResult = await _asyncStack.TryTakeAsync();

if (!takeResult.Success)

break;

Trace.WriteLine(takeResult.Item);

}

Discussion

AsyncCollection<T> is really just the asynchronous equivalent of BlockingCollection<T> and only supports the same platforms (see Table 8-10).

TIP

The AsyncCollection<T> type is in the Nito.AsyncEx NuGet package.

Table 8-10. Platform support for stacks and bags

Platform

BlockingCollection<T> (blocking)

AsyncCollection<T> (asynchronous)

.NET 4.5

Yes

Yes

.NET 4.0

Yes

Yes

Mono iOS/Droid

Yes

Yes

Windows Store

Yes

Yes

Windows Phone Apps 8.1

Yes

Yes

Windows Phone SL 8.0

No

No

Windows Phone SL 7.1

No

No

Silverlight 5

No

No

See Also

Recipe 8.8 covers asynchronous queues, which are much more common than asynchronous stacks or bags.

Recipe 8.7 covers synchronous (blocking) stacks and bags.

8.10. Blocking/Asynchronous Queues

Problem

You need a conduit to pass messages or data from one part of code to another in a first-in, first-out manner, and you need the flexibility to treat either the producer end or the consumer end as synchronous or asynchronous.

For example, a background thread may be loading data and pushing it into the conduit, and you want this thread to synchronously block if the conduit is too full. At the same time, the UI thread is receiving data from the conduit, and you want this thread to asynchronously pull data from the conduit so the UI remains responsive.

Solution

We’ve looked at blocking queues in Recipe 8.6 and asynchronous queues in Recipe 8.8, but there are a few queue types that support both blocking and asynchronous APIs.

The first is BufferBlock<T> and ActionBlock<T> from the TPL Dataflow NuGet library. BufferBlock<T> can be easily used as an asynchronous producer/consumer queue (see Recipe 8.8 for more details):

BufferBlock<int> queue = new BufferBlock<int>();

// Producer code

await queue.SendAsync(7);

await queue.SendAsync(13);

queue.Complete();

// Consumer code for a single consumer

while (await queue.OutputAvailableAsync())

Trace.WriteLine(await queue.ReceiveAsync());

// Consumer code for multiple consumers

while (true)

{

int item;

try

{

item = await queue.ReceiveAsync();

}

catch (InvalidOperationException)

{

break;

}

Trace.WriteLine(item);

}

BufferBlock<T> also supports a synchronous API for both producers and consumers:

BufferBlock<int> queue = new BufferBlock<int>();

// Producer code

queue.Post(7);

queue.Post(13);

queue.Complete();

// Consumer code

while (true)

{

int item;

try

{

item = queue.Receive();

}

catch (InvalidOperationException)

{

break;

}

Trace.WriteLine(item);

}

However, the consumer code using BufferBlock<T> is rather awkward, since it is not the “dataflow way.” The TPL Dataflow library includes a number of blocks that can be linked together, allowing you to define a reactive mesh. In this case, a producer/consumer queue completing with a particular action can be defined using ActionBlock<T>:

// Consumer code is passed to queue constructor

ActionBlock<int> queue = new ActionBlock<int>(item => Trace.WriteLine(item));

// Asynchronous producer code

await queue.SendAsync(7);

await queue.SendAsync(13);

// Synchronous producer code

queue.Post(7);

queue.Post(13);

queue.Complete();

If the TPL Dataflow library is not available on your desired platform(s), then there is an AsyncProducerConsumerQueue<T> type in Nito.AsyncEx that also supports both synchronous and asynchronous methods:

AsyncProducerConsumerQueue<int> queue = new AsyncProducerConsumerQueue<int>();

// Asynchronous producer code

await queue.EnqueueAsync(7);

await queue.EnqueueAsync(13);

// Synchronous producer code

queue.Enqueue(7);

queue.Enqueue(13);

queue.CompleteAdding();

// Asynchronous single consumer code

while (await queue.OutputAvailableAsync())

Trace.WriteLine(await queue.DequeueAsync());

// Asynchronous multi-consumer code

while (true)

{

var result = await queue.TryDequeueAsync();

if (!result.Success)

break;

Trace.WriteLine(result.Item);

}

// Synchronous consumer code

foreach (var item in queue.GetConsumingEnumerable())

Trace.WriteLine(item);

Discussion

Even though AsyncProducerConsumerQueue<T> supports a wider range of platforms, I recommend using BufferBlock<T> or ActionBlock<T> if possible because the TPL Dataflow library has been more extensively tested than the Nito.AsyncEx library.

All of the TPL Dataflow blocks as well as AsyncProducerConsumerQueue<T> also support throttling by passing options to their constructors. Throttling is necessary when you have producers that push items faster than your consumers can consume them, which could cause your application to take up large amounts of memory. Platform support for synchronous/asynchronous queues is outlined in Table 8-11.

TIP

The BufferBlock<T> and ActionBlock<T> types are in the Microsoft.Tpl.Dataflow NuGet package. The AsyncProducerConsumerQueue<T> type is in the Nito.AsyncEx NuGet package.

Table 8-11. Platform support for synchronous/asynchronous queues

Platform

BufferBlock<T> and ActionBlock<T>

AsyncProducerConsumerQueue<T>

.NET 4.5

Yes

Yes

.NET 4.0

No

Yes

Mono iOS/Droid

Yes

Yes

Windows Store

Yes

Yes

Windows Phone Apps 8.1

Yes

Yes

Windows Phone SL 8.0

Yes

Yes

Windows Phone SL 7.1

No

Yes

Silverlight 5

No

Yes

See Also

Recipe 8.6 covers blocking producer/consumer queues.

Recipe 8.8 covers asynchronous producer/consumer queues.

Recipe 4.4 covers throttling dataflow blocks.