Threads and Tasks - The Standard Library - The C++ Programming Language (2013)

The C++ Programming Language (2013)

Part IV: The Standard Library

42. Threads and Tasks

Keep Calm and Carry On.

– English slogan

Introduction

Threads

Identity; Construction; Destruction; join(); detach(); Namespace this_thread; Killing a thread; thread_local Data

Avoiding Data Races

Mutexes; Multiple Locks; call_once(); Condition Variables

Task-Based Concurrency

future and promise; promise; packaged_task; future; shared_future; async(); A Parallel find() Example

Advice

42.1. Introduction

Concurrency – the execution of several tasks simultaneously – is widely used to improve throughput (by using several processors for a single computation) or to improve responsiveness (by allowing one part of a program to progress while another is waiting for a response).

The C++ standard support for concurrency is introduced in a tutorial manner in §5.3. This chapter and the previous one provide a more detailed and systematic view.

We call an activity potentially executed concurrently with other activities a task. A thread is the system-level representation of a computer’s facilities for executing a task. A thread can execute a task. A thread may share an address space with other threads. That is, all threads in a single address space can access the same memory locations. One of the central challenges of the programmer of a concurrent system is to make sure that threads access memory in a sensible manner.

42.2. Threads

A thread is an abstraction of the computer hardware’s notion of a computation. The C++ standard-library threads are intended to map one-to-one with the operating system’s threads. We use threads when several tasks in a program need to progress concurrently. On a system with several processing units (“cores”), threads allows us to use those units. All threads work in the same address space. If you want hardware protection against data races, use some notion of a process. Stacks are not shared between threads, so local variables are not subject to data races, that is, unless you incautiously pass a pointer to a local variable to another thread. In particular, beware of by-reference context bindings in lambdas (§11.4.3). Deliberate and careful sharing of stack memory is useful and common, for example, we might pass sections of a local array to a parallel sort.

If a thread cannot proceed (e.g., because it has encountered a mutex owned by another thread), it is said to be blocked or asleep.

Image

A thread represents a system resource, a system thread, possibly even with dedicated hardware:

Image

Consequently, a thread can be moved but not copied.

After being the source of a move, a thread no longer represents a thread of computation. In particular, it cannot be join()ed.

The thread::hardware_concurrency() operation reports the number of tasks that can simultaneously proceed with hardware support. The exact meaning of that is architecture-dependent, but it is usually less than the number of threads offered by the operating system (e.g., through time multiplexing or time slicing) and sometimes higher than the number of processors or “cores.” For example, my little two-core laptop reports four hardware threads (it is using what is sometimes called hyper-threading).

42.2.1. Identity

Each thread of execution has a unique identifier represented as a value of type thread::id. If a thread does not represent a thread of execution, its id is the default id{}. The id of a thread t can be obtained by a call of t.get_id().

The id of the current thread can be obtained by this_thread::get_id()42.2.6).

A thread can have its id be id{} if

• it has not had a task assigned,

• it has terminated,

• it has been moved from, or

• it has been detach()ed.

Every thread has an id, but a system thread may still be running even though it does not have an id (i.e., after a detach()).

A thread::id can be copied, and ids can be compared with the usual comparison operators (==, <, etc.), output using <<, and hashed with a specialization hash<thread::id>31.4.3.4). For example:

void print_id(thread& t)
{
if (t.get_id()==id{})
cout << "t not joinable\n";
else
cout << "t's id is " << t.get_id() << '\n';
}

Note that cout is a global shared object so that those output statements are not guaranteed to produce output characters in a recognizable sequence unless you make sure that no two threads are writing to cout at the same time (§iso.27.4.1).

42.2.2. Construction

A thread constructor takes a task to be executed and the arguments required by that task. The number and types of arguments must match what the task requires. For example:

void f0(); // no arguments
void f1(int); // one int argument

thread t1 {f0};
thread t2 {f0,1}; // error: too many arguments
thread t3 {f1}; // error: too few arguments
thread t4 {f1,1};
thread t5 {f1,1,2}; // error: too many arguments
thread t3 {f1,"I'm being silly"}; // error: wrong type of argument

After construction, a thread starts executing its task as soon as the run-time system can acquire resources for it to run. Think of that as “immediately.” There is no separate “start the thread” operation.

If you want to build a set of tasks and link them together (e.g., to communicate through message queues), you first build the tasks as function objects and then – when they are all ready to run – start threads. For example:

template<typename T>
class Sync_queue<T> { // a queue providing put() and get() without data races (§42.3.4)
// ...
};

struct Consumer {
Sync_queue<Message>& head;
Consumer(Sync_queue<Message>& q) :head(q) {}
void operator()(); // get messages from head
};

struct Producer {
Sync_queue<Message>& tail;
Consumer(Sync_queue<Message>& q) :tail(q) {}
void operator()(); // put messages on tail
};

Sync_queue<Message> mq;
Consumer c {mq}; // make tasks and "wire them together"
Producer p {mq};

thread pro {p}; // finally: start threads
thread con {c};

// ...

Trying to intersperse thread creation with the setup of connections among the tasks to be run by the threads can easily become complicated and error-prone.

The thread constructors are variadic templates (§28.6). This implies that to pass a reference to a thread constructor, we must use a reference wrapper (§33.5.1). For example:

void my_task(vector<double>& arg);

void test(vector<double>& v)
{
thread my_thread1 {my_task,v}; // oops: pass a copy of v
thread my_thread2 {my_task,ref(v)}; // OK: pass v by reference
thread my_thread3 {[&v]{ my_task(v); }}; // OK: dodge the ref() problem
// ...
}

The problem is that the variadic template uses bind() or some equivalent mechanism, so that a reference is by default dereferenced and the result copied. So, if v was {1,2,3} and my_task increments elements, thread1 would never have any effect on v. Note that all three threads have a data race on v; this is an example of calling conventions, not of good concurrent programming style.

A default-constructed thread is primarily useful as the target for a move. For example:

vector<thread> worker(1000); // 1000 default threads

for (int i=0; i!=worker.size(); ++i) {
// ... compute argument for worker[i] and create worker thread tmp ...
worker[i] = move(tmp);
}

Moving a task from one thread to another does not affect its execution. A thread move simply changes what the threads refer to.

42.2.3. Destruction

Obviously, the thread destructor destroys the thread object. To prevent a system thread from accidentally outliving its thread, the thread destructor calls terminate() to terminate the program if the thread is joinable() (that is, if get_id()!=id{}). For example:

void heartbeat()
{
while(true){
output(steady_clock::now());
this_thread::sleep_for(second{1}); // §42.2.6
}
}

void run()
{
thread t {heartbeat};
} // terminate because heartbeat() is still running at the end of t's scope

If you really need to have a system thread proceed beyond the lifetime of its thread see §42.2.5.

42.2.4. join()

A t.join() tells the current thread not to proceed until t completes. For example:

void tick(int n)
{
for (int i=0; i!=n; ++i) {
this_thread::sleep_for(second{1}); // §42.2.6
output("Alive!");
}
}

int main()
{
thread timer {tick,10};
timer.join();
}

This will output Alive! ten times at about 1-second intervals. Had the timer.join() been missing, the program would have terminated before tick() could have printed anything. The join() made the main program wait for the timer to complete.

As mentioned in §42.2.3, trying to have a thread execute past the end of its scope (or more generally, after its destructor is run) without calling detach() is considered a fatal (for the program) error. However, we can forget to join() a thread. When we view a thread as a resource, we see that we should consider RAII (§5.2, §13.3). Consider a simple test example:

void run(int i, int n) // warning: really poor code
{
thread t1 {f};
thread t2;
vector<Foo> v;
// ...
if (i<n) {
thread t3 {g};
// ...
t2 = move(t3); // move t3 to outer scope
}
v[i] = Foo{}; // might throw
// ...
t1.join();
t2.join();
}

Here, I have made several bad mistakes. In particular:

• We may never reach the two join()s at the end. In that case, the destructor for t1 will terminate the program.

• We may reach the two join()s at the end without the move t2=move(t3) having executed. In that case, t2.join() will terminate the program.

For this kind of thread use, we need a destructor that implicitly join()s. For example:

struct guarded_thread : thread {
using thread::thread; // §20.3.5.1
~guarded_thread() { if (t.joinable()) t.join(); }
};

Unfortunately, guarded_thread is not a standard-library class, but in the best RAII tradition guarded_thread makes our code shorter and less error-prone. For example:

void run2(int i, int n) // simple use of a guard
{
guarded_thread t1 {f};
guarded_thread t2;
vector<Foo> v;
// ...
if (i<n) {
thread t3 {g};
// ...
t2 = move(t3); // move t3 to outer scope
}
v[i] = Foo{}; // might throw
// ...
}

But why doesn’t the thread’s destructor just join()? There is a long-standing tradition of using system threads that “live forever” or decide for themselves when to terminate. Had it worked, the timer executing tick()42.2.2) would have been an example of such a thread. Threads monitoring data structures provide many more examples. Such threads (and processes) are often called daemons. Another use for detached threads is to simply initiate a thread to complete a task and forget about it. Doing so leaves the “housekeeping” to the run-time system.

42.2.5. detach()

Accidentally letting a thread try to execute beyond its destructor is considered a very bad error. If you really want a system thread to outlive its thread (handle), use detach(). For example:

void run2()
{
thread t {heartbeat};
t.detach(); // let heartbeat run independently
}

I have a philosophical problem with detached threads. Given a choice, I would prefer to

• know exactly which threads are running,

• be able to determine if threads are making progress as expected,

• be able to check if threads that are supposed to delete themselves really do so,

• be able to know whether it is safe to use the results of a thread,

• be sure that all resources associated with a thread are properly released, and

• be sure that a thread does not try to access objects from the scope in which it was created after that scope has been destroyed.

Unless I go beyond the standard library (e.g., using get_native_handle() and “native” system facilities), I cannot do so for detached threads. Also, how do I debug a system where the behavior of detached threads cannot be directly observed? What happens if a detached thread holds a pointer to something in the scope in which it was created? That could lead to corrupted data, a system crash, or a security violation. Yes, obviously detached threads can be useful and debugged. After all, people have been doing it for decades. But people have been doing self-destructive things for centuries and deemed them useful. Given a choice, I prefer not to detach() threads.

Note that thread provides a move assignment and a move constructor. This allows threads to migrate out of the scope in which they were constructed and often provides an alternative to detach(). We can migrate threads to a “main module” of a program, access them throughunique_ptrs or shared_ptrs, or place them in a container (e.g., vector<thread>) to avoid losing track of them. For example:

vector<thread> my_threads; // keep otherwise detached threads here

void run()
{
thread t {heartbeat};
my_threads.push_back(move(t));
// ...
my_threads.emplace_back(tick,1000);
}

void monitor()
{
for (thread& t : my_threads)
cout << "thread " << t.get_id() << '\n';
}

For a more realistic example, I would associate some information with each thread in my_threads. Maybe I’d even launch monitor as a task.

If you must use a detach() a thread, do make sure that it does not refer to variables in its scope. For example:

void home() // don't do this
{
int var;
thread disaster{[&]{ this_thread::sleep_for(second{7.3});++var; }}
disaster.detach();
}

Apart from the warning comment and the evocative name, this code looks quite innocent. It is not: the system thread invoked by disaster() will “forever” keep writing to the address where home()’s var was allocated, corrupting any data that may later be allocated there. This kind of error is extremely hard to find because it is only loosely connected to the code in which it manifests itself, and repeated runs of the program will have different results – many runs may exhibit no symptoms. Such bugs have been called Heisenbugs in honor of the discoverer of the uncertainty principle.

Note that the root problem in that example is a violation of the simple and well known rule “Don’t pass a pointer to a local object out of its scope” (§12.1.4). However, with a lambda, it is easy (and almost invisible) to create a pointer to a local variable: [&]. Fortunately, we have to saydetach() to allow a thread to exit its scope; don’t do that without a very good reason, and then only after carefully considering what its task might do.

42.2.6. Namespace this_thread

Operations for the current thread are found in namespace this_thread:

Image

To get the identity of the current thread, call this_thread::get_id(). For example:

void helper(thread& t)
{
thread::id me {this_thread::get_id()};
// ...
if (t.get_id()!=me) t.join();
// ...
}

Similarly, we can use this_thread::sleep_until(tp) and this_thread::sleep_for(d) to put the current thread to sleep.

The this_thread::yield() is used to give another thread a chance to proceed. The current thread is not blocked, so it will eventually be run again without any other thread having to do anything specific to wake it. Thus, yield() is primarily useful for waiting for an atomic to change state and for cooperative multi-threading. Usually, it is better to use sleep_for(n) than to just yield(). The argument to sleep_for() gives the scheduler a better chance to make rational choices about which threads to run when. Consider yield() a feature for optimization in very rare and specialized cases.

On all major implementations threads are preemptable; that is, the implementation can switch from one task to another to ensure that all threads progress at a reasonable rate. However, for historical and language technical reasons, preemption is only encouraged rather than required by the standard (§iso.1.10).

Usually, programmers should not mess with system clocks. But if a clock is reset (say, because it has drifted from the true time), a wait_until() would be affected, but not a wait_for(). The same applies to wait_until() and wait_for() for a timed_mutex42.3.1.3).

42.2.7. Killing a thread

I find one important operation missing from thread. There is no simple standard way of telling a running thread that I have lost interest in its task, so would it please stop running and release all its resources. For example, if I start a parallel find()42.4.7), I would often like to ask remaining tasks to stop once I have an answer. There are various historical and technical reasons for the lack of this operation (called kill, cancel, and interrupt in various languages and systems).

If needed, application programmers can write their own versions of this idea. For example, many tasks involve a request loop. In that case, a “kindly commit suicide” message would allow the receiving thread to release all resources and then terminate. If there is no request loop, a task could periodically examine a “needed” variable to see if results are still wanted.

So, a general cancel operation may be difficult to design and to implement on all systems, but I have never seen an application where a specific cancellation mechanism wasn’t relatively easy to implement.

42.2.8. thread_local Data

As indicated by its name, a thread_local variable is an object owned by a thread and not accessible from other threads unless its owner (incautiously) gives them a pointer to it. In that, a thread_local resembles a local variable, but a local variable has its lifetime and access limited by its scope within a function, whereas a thread_local is shared among all functions of a thread and “lives” for as long as the thread. A thread_local object can be extern.

For most uses, having objects local (on the stack) is preferable to having them shared; thread_local storage shares the logical problems of global variables. As usual, namespaces can be used to limit the problems with nonlocal data. However, on many systems, the amount of stack storage for a thread is rather limited, so thread_local storage becomes important for tasks that require large amounts of nonshared data.

A thread_local is said to have thread storage duration (§iso.3.7.2). Each thread has its own copy of its thread_local variables. A thread_local is initialized before its first use (§iso.3.2). If constructed, it will be destroyed on thread exit.

An important use of thread_local storage is for a thread to explicitly keep a cache of data for exclusive access. That can complicate the program logic, but on machines with shared caches it can sometimes deliver very significant performance advantages. Also, it can simplify and/or lower the cost of locking by transferring data only in larger batches.

In general, nonlocal memory is a problem for concurrent programming because it is often nontrivial to determine if it is shared and thus a possible source of data races. In particular, static class members can be a major problem because they are often hidden from the users of a class, so that potential data races are easily missed. Consider a Map design with a per-type default value:

template<typename K, typename V>
class Map {
public:
Map();
// ...
static void set_default(const K&,V&); // set default for all Maps of type Map<K,V>
private:
static pair<const K,V> default_value;
};

Why would a user suspect a data race on two different Map objects? Obviously, a user who spotted set_default() among the members might suspect, but set_default() is an easily overlooked minor feature (§16.2.12).

One-per-class (static) values used to be popular. They include default values, use counters, caches, free lists, answers to frequently asked questions, and many obscure uses. When used in a concurrent system, we have a classic problem:

// somewhere in thread 1:
Map<string,int>::set_default("Heraclides",1);

// somewhere in thread 2:
Map<string,int>::set_default("Zeno",1);

This is a potential data race: which thread gets to first execute set_default()?

Adding thread_local helps:

template<typename K, typename V>
class Map {
// ...
private:
static thread_local pair<const K,V> default_value;
};

Now, there is no longer a potential data race. However, there is no longer a single default_value shared among all users either. In the example, thread 1 will never see the effect of a set_default() in thread 2. As often as not, that was not what was intended in the original code, so by addingthread_local, we simply exchanged one error for another. Consider static data members suspect (always, because you don’t know if your code might someday be executed as part of a concurrent system), and do not consider thread_local a panacea.

A namespace variable, a local static, and a class static member can be declared thread_local. As for local static variables, the construction of a thread_local local variable is protected by a first-time switch (§42.3.3). The order of construction of thread_locals is undefined, so keep the construction of different thread_locals independent of their order and use compile-time or link-time initialization whenever possible. Like static variables, thread_locals are by default initialized to zero (§6.3.5.1).

42.3. Avoiding Data Races

The best way to avoid data races is not to share data. Keep interesting data in local variables, in free store not shared with other threads, or in thread_local memory (§42.2.8). Do not pass pointers to such data to other threads. When such data needs to be processed by another thread (e.g., by a parallel sort), pass pointers to a specific section of the data and make sure not to touch that section of the data passed until after the termination of the task.

These simple rules are based on the idea of avoiding attempts to simultaneously access data, so they don’t require locking and lead to maximally efficient programs. Where they cannot be used, for example, because lots of data needs to be shared, use some form of locking:

Mutexes: A mutex (a mutual exclusion variable) is an object used to represent the exclusive right to access some resource. To access the resource, acquire the mutex, access, and then release the mutex (§5.3.4, §42.3.1).

Condition variables: A condition variable is a variable used by a thread to wait for an event generated by another thread or a timer (§5.3.4.1, §42.3.4).

Strictly speaking, condition variables do not prevent data races. Rather, they save us from having to introduce shared data that might become a source of data races.

42.3.1. Mutexes

A mutex is an object used to represent exclusive access to some resource. Thus, it can be used to protect against data races and to synchronize access to data shared between multiple threads.

Image

The “plain” mutex is the simplest, smallest, and fastest mutex. In exchange for added functionality, recursive and timed mutexes carry a small cost, which may or may not be significant for a given application on a given machine.

Only one thread can own a mutex at any one time:

• To acquire a mutex means to gain exclusive ownership of it; an acquire operation may block the thread executing it.

• To release a mutex means relinquishing exclusive ownership; a release operation will allow another thread to eventually acquire the mutex. That is, a release operation will unblock waiting threads.

If several threads are blocked on a mutex the system scheduler could in principle select the thread to be unblocked in such a way that some unfortunate thread would never get to run. This is called starvation, and a scheduling algorithm that avoids starvation by giving each thread an equal chance to make progress is said to be fair. For example, a scheduler might always choose the thread with the highest thread::id to run next, thereby starving a thread with a low id. The standard does not guarantee fairness, but in reality schedulers are “reasonably fair.” That is, they make it extremely unlikely that a thread starves forever. For example, a scheduler may pick the next thread to run randomly among those blocked.

By itself, a mutex doesn’t do anything. Instead, we use a mutex to represent something else. We use ownership of a mutex to represent the right to manipulate a resource, such as an object, some data, or an I/O device. For example, we could define a cout_mutex to represent the right to use cout from a thread:

mutex cout_mutex; // represent the right to use cout

template<typename Arg1, typename Arg2, typename Arg3>
void write(Arg1 a1, Arg2 a2 = {}, Arg3 a3 = {})
{
thread::id name = this_thread::get_id();
cout_mutex.lock();
cout << "From thread " << name << " : " << a1 << a2 << a3;
cout_mutex.unlock();
}

If all threads use write() we should get output from different threads properly separated. The snag is that every thread has to use a mutex as intended. The correspondence between a mutex and its resource is implicit. In the cout_mutex example, a thread using cout directly (bypassingcout_mutex) can corrupt output. The standard guarantees that the cout variable is protected against corruption, but there is no protection against output from different threads becoming intermixed.

Note that I locked the mutex only for the one statement that required the lock. To minimize contention and the chances of a thread becoming blocked, we try to minimize the time a lock is held by locking only where it is essential to do so. A section of code protected by a lock is called acritical section. To keep code fast and free of problems related to locking, we minimize the size of critical sections.

The standard-library mutexes provide exclusive ownership semantics. That is, a single thread (at a time) has exclusive access to the resource. There are other kinds of mutexes. For example, multiple-reader-single-writer mutexes are popular, but the standard library does not (yet) offer one. If you need a different kind of mutex, use one offered by a specific system or write it yourself.

42.3.1.1. mutex and recursive_mutex

Class mutex offers a simple set of operations:

Image

A mutex cannot be copied or moved. Think of a mutex as a resource, rather than a handle of a resource. In fact, a mutex is typically implemented as a handle to a system resource, but since that system resource cannot be shared, leaked, copied, or moved, it is usually a spurious complication to think of them as separate.

The basic uses of a mutex are very simple. For example:

mutex cout_mutex; // initialized to "not owned by any thread"

void hello()
{
cout_mutex.lock();
cout << "Hello, ";
cout_mutex.unlock();
}
void world()
{
cout_mutex.lock();
cout << "World!";
cout_mutex.unlock();
}

int main()
{
thread t1 {hello};
thread t2 {world};

t1.join();
t2.join();
}

Given that, we will get the output

Hello, World!

or

World! Hello,

We will not get cout corrupted or some mixed-up output characters.

The try_lock() operation is used when we have some other work we might usefully do if some other thread is using a resource. As an example, consider a work generator that composes work requests for other tasks and places them on a work queue:

extern mutex wqm;
extern list<Work> wq;

void composer()
{
list<Work> requests;

while (true) {
for (int i=0; i!=10; ++i) {
Work w;
// ... compose work request ...
requests.push_back(w);
}
if (wqm.try_lock()) {
wq.splice(requests); // splice() requests into the list (§31.4.2)
wqm.unlock();
}
}
}

When some server thread is examining wq, the composer() simply makes some more work instead of waiting.

When using locks, we have to beware of deadlock. That is, we must not wait for a lock that can never be released. The simplest form of deadlock requires only one lock and one thread. Consider a variant of the thread-safe output operation:

template<typename Arg, typename... Args>
void write(Arg a, Args tail...)
{
cout_mutex.lock();
cout << a;
write(tail...);
cout_mutex.unlock();
}

Now, if a thread calls write("Hello,","World!"), it will deadlock with itself when it tries the recursive call for the tail.

Recursive and mutually recursive calls are common enough for the standard to provide a solution. A recursive_mutex is just like a plain mutex, except that a single thread can acquire it repeatedly. For example:

recursive_mutex cout_mutex; // changed to recursive_mutex to avoid deadlock

template<typename Arg, typename... Args>
void write(Arg a, Args tail...)
{
cout_mutex.lock();
cout << a;
write(tail...);
cout_mutex.unlock();
}

Now the recursive call of write() is correctly handled by cout_mutex.

42.3.1.2. mutex Errors

Trying to manipulate a mutex can fail. If so, the mutex operation throws a system_error. Some of the possible errors reflect conditions in the underlying system:

Image

For example:

mutex mtx;
try {
mtx.lock();
mtx.lock(); // try to lock a second time
}
catch (system_error& e) {
mtx.unlock();
cout << e.what() << '\n';
cout << e.code() << '\n';
}

I got the output

device or resource busy
generic: 16

This looks like a good argument for using a lock_guard or a unique_lock42.3.1.4).

42.3.1.3. timed_mutex and recursive_timed_mutex

A simple mtx.lock() is unconditional. If we don’t want to block, we can use mtx.try_lock(), but when we fail to acquire mtx, we often want to wait for a while before trying again. The timed_mutex and recursive_timed_mutex offer support for that:

Image

The recursive_timed_mutex interface is identical to the timed_mutex interface (just as the recur-sive_mutex interface is identical to the mutex interface).

For this_thread, we can sleep_until(tp) a time_point and sleep_for(d) a duration42.2.6). More generally, we can m.try_lock_until(tp) or m.try_lock_for(d) for a timed_mutex m. If tp is before the current point in time or d is less than or equal to zero, the operation is equivalent to a “plain” try_lock().

As an example, consider updating an output buffer with a new image (e.g., in a video game or a visualization):

extern timed_mutex imtx;
extern Image buf;

void next()
{
while (true) {
Image next_image;
// ... compute ...

if (imtx.try_lock(milliseconds{100})) {
buf = next_image;
imtx.unlock();
}
}
}

The assumption here is that if the image cannot be updated reasonably fast (here, in 100 milliseconds), the user would prefer a newer version of the image. Further, it is assumed that missing an image in a sequence of updated images will rarely be noticed, so that a more complicated solution is not needed.

42.3.1.4. lock_guard and unique_lock

A lock is a resource, so we must not forget to release it. That is, each m.lock() operation must be matched by an m.unlock() operation. The usual opportunities for mistakes exist; for example:

void use(mutex& mtx, Vector<string>& vs, int i)
{
mtx.lock();
if (i<0) return;
string s = vs[i];
// ...
mtx.unlock();
}

The mtx.unlock() is there, but if i<0 or if i is out of vs’s range and vs is range checked, the thread of execution never gets to the mtx.unlock() and mtx may be locked forever.

The standard library provides two RAII classes, lock_guard and unique_lock, to handle such problems.

The “plain” lock_guard is the simplest, smallest, and fastest guard. In exchange for added functionality, unique_ptr carries a small cost, which may or may not be significant for a given application on a given machine.

Image

For example:

void use(mutex& mtx, vector<string>& vs, int i)
{
lock_guard<mutex> g {mtx};
if (i<0) return;
string s = vs[i];
// ...
}

The lock_guard’s destructor does the necessary unlock() on its argument.

As usual, we should only hold a lock for the minimal amount of time, so a lock_guard should not become an excuse for holding a lock until the end of a large scope if we only need the lock for a small section of the scope. Obviously, the checking of i does not require locking, so we could do that before acquiring the lock:

void use(mutex& mtx, vector<string>& vs, int i)
{
if (i<0) return;
lock_guard<mutex> g {mtx};
string s = vs[i];
// ...
}

Furthermore, imagine that we only needed the lock for the read of v[i]. Then, we could put the lock_guard in a small scope:

void use(mutex& mtx, vector<string>& vs, int i)
{
if (i<0) return;
string s;
{
lock_guard<mutex> g {mtx};
s = vs[i];
}
// ...
}

Is such complication of the code worthwhile? Without looking at the code “hidden in the ...” we cannot tell, but we should definitely not use a lock_guard just out of unwillingness to consider where locking is needed. Minimizing the size of critical sections is in general a useful thing to do. If nothing else, it forces us to think about exactly where a lock is needed and why.

So, a lock_guard (and also a unique_lock) is a resource handle (“a guard”) for an object that you can lock to acquire ownership and unlock to release.

Image

Such an object is called a lockable object. The obvious lockable object is of a standard-library mutex type, but users can define their own.

A lock_guard is a very simple class with no interesting operations. All it does is RAII for a mutex. To get an object that provides RAII and operations on a contained mutex, we use a unique_lock:

Image

Obviously, the timed operations are only allowed if the contained mutex is a timed_mutex or a recur-sive_timed_mutex.

For example:

mutex mtx;
timed_mutex mtx2;

void use()
{
unique_lock<defer_lock_t,mutex> lck {mtx};
unique_lock<defer_lock_t,timed_mutex> lck2 {mtx2};

lck.try_lock_for(milliseconds{2}); // error: mutex does not have member try_lock_for()

lck2.try_lock_for(milliseconds{2}); // OK
lck2.try_lock_until(steady_clock::now()+milliseconds{2});
// ...
}

If you give a duration or a time_point as a second argument to a unique_lock, the constructor will execute the appropriate try operation. The owns_lock() operations allow us to check whether such an acquisition succeeded. For example:

timed_mutex mtx2;

void use2()
{
unique_lock<timed_mutex> lck2 {mtx2,milliseconds{2}};
if (lck2.owns_lock()) {
// acquisition succeeded:
// ... do something ...
}
else {
// timeout:
// ... do something else ...
}
}

42.3.2. Multiple Locks

It is fairly common to want to acquire multiple resources to do some task. Unfortunately, acquiring two locks implies the opportunity of deadlock. For example:

mutex mtx1; // protects one resource
mutex mtx2; // protects another resource

void task(mutex& m1, mutex& m2)
{
unique_lock<mutex> lck1 {m1};
unique_lock<mutex> lck2 {m2};
// ... use resources ...
}
thread t1 {task,ref(mtx1),ref(mtx2)};
thread t2 {task,ref(mtx2),ref(mtx1)};

The ref() is the std::ref() reference wrapper from <functional>33.5). It is needed to pass a reference through a variadic template (the thread constructor; §42.2.2). A mutex cannot be copied or moved, so I must pass them by reference (or use a pointer).

Change the names from mtx1 and mtx2 to something that does not indicate order and separate the definitions of t1 and t2 from each other in the source text and it will no longer be obvious that there is a good chance that the program will eventually deadlock with t1 owning mtx1, t2owning mtx2, and each trying to acquire its second mutex forever.

Image

The actual algorithm for try_lock() is unspecified, but one possibility would be:

template <typename M1, typename... Mx>
int try_lock(M1& mtx, Mx& tail...)
{
if (mtx.try_lock()) {
int n = try_lock(tail...);
if (n == –1) return –1; // all locks acquired
mtx.unlock(); // back out
return n+1;
}
return 1; // couldn't acquire mtx
}

template <typename M1>
int try_lock(M1& mtx)
{
return (mtx.try_lock()) ? –1 : 0;
}

Given lock(), the buggy task() can be simplified and corrected:

void task(mutex& m1, mutex& m2)
{
unique_lock lck1 {m1,defer_lock_t};
unique_lock lck2 {m1,defer_lock_t};
lock(lck1,lck2);
// ... use resources ...
}

Note that applying lock() directly to the mutexes, lock(m1,m2), rather than to the unique_locks would have left the programmer with the obligation to explicitly release m1 and m2.

42.3.3. call_once()

We often want to initialize an object without getting into a race condition. The type once_flag and the function call_once() offer a low-level, efficient, and simple tool for that.

Image

For example:

class X {
public:
X();
// ...
private:
// ...
static once_flag static_flag;
static Y static_data_for_class_X;
static void init();
};

X::X()
{
call_once(static_flag,init());
}

One way to think about call_once() is as a way to simply modify preconcurrency code that relies on initialized static data.

Run-time initialization of a local static variable is implemented by call_once() or by a mechanism very similar to call_once(). Consider:

Color& default_color() // user code
{
static Color def { read_from_environment("background color") };
return def;
}

This may be implemented as

Color& default_color() // generated code
{
static Color def;
static_flag __def;
call_once(__def,read_from_environment,"background color");
return def;
}

I use the double underscore prefix (§6.3.3) to emphasize that this latter version represents compiler-generated code.

42.3.4. Condition Variables

Condition variables are used to manage communication among threads. A thread can wait (block) on a condition_variable until some event, such as reaching a specific time or another thread completing, occurs.

Image

A condition_variable may (or may not) rely on system resources, so a constructor may fail for lack of such a resource. However, like a mutex, a condition_variable cannot be copied or moved, so it is best to think of a condition_variable as a resource in itself, rather than as a handle.

When a condition_variable is destroyed, all waiting threads (if any) must be notified (i.e., told to wake up) or they may wait forever.

The status returned by wait_until() and wait_for() is defined as:

enum class cv_status { no_timeout, timeout };

A condition_variable’s unique_lock is used by the wait functions to prevent wake-ups being lost due to contention on the unique_lock’s list of waiting threads.

The “plain” wait(lck) is a low-level operation that should be used with extra care and usually in the implementation of some higher-level abstraction. It can wake up “spuriously.” That is, the system may decide to resume wait()’s thread even though no other thread has notified it!Apparently, allowing spurious wake-up simplifies implementation of condition_variables on some systems. Always use “plain” wait() in a loop. For example:

while (queue.empty()) wait(queue_lck);

An additional reason for this loop is that some thread may have “snuck up” and invalidated the condition (here, queue.empty()) before the thread calling the unconditional wait() got to run. Such a loop basically is the implementation of a wait with a condition, so prefer those over the unconditional wait().

A thread can wait for an amount of time:

void simple_timer(int delay)
{
condition_variable timer;
mutex mtx; // mutex protecting timer
auto t0 = steady_clock::now();
unique_lock<mutex> lck(mtx); // acquire mtx
timer.wait_for(lck,milliseconds{delay}); // release and reacquire mtx
auto t1 = steady_clock::now();
cout << duration_cast<milliseconds>(t1–t0).count() << "milliseconds passed\n";
} // implicitly release mtx

This basically shows the implementation of this_thread::wait_for(). The mutex protects wait_for() against data races. The wait_for() releases its mutex as it goes to sleep and reacquires it as its thread is unblocked. Finally, lck (implicitly) releases the mutex at the end of its scope.

Another simple use of a condition_variable is to control the flow of messages from a producer to a consumer:

template<typename T>
class Sync_queue {
public:
void put(const T& val);
void put(T&& val);
void get(T& val);
private:
mutex mtx;
condition_variable cond;
list<T> q;
};

The idea is that put() and get() will not get in each other’s way. A thread that does a get() will sleep unless there is a value on the queue for it to get.

template<typename T>
void Sync_queue::put(const T& val)
{
lock_guard<mutex> lck(mtx);
q.push_back(val);
cond.notify_one();
}

That is, a producer put() acquires the queue’s mutex, adds a value at the end of the queue, calls notify_one() to wake a possibly blocked consumer, and implicitly releases the mutex. I provided an rvalue version of put() so that we can transmit objects of types that have move, but not copy, operations, such as unique_ptr5.2.1, §34.3.1) and packaged_task42.4.3).

I used notify_one() rather than notify_all() because I only added one element and wanted to keep put() simple. The possibility of multiple consumers and the possibility of consumers falling behind the producer might make me reconsider.

The get() is a bit more complicated because it should only block its thread if the mutex precludes access or if the queue is empty:

template<typename T>
void Sync_queue::get(T& val)
{
unique_lock<mutex> lck(mtx);
cond.wait(lck,[this]{ return !q.empty(); });
val=q.front();
q.pop_front();
}

A caller of get() will remain blocked until the Sync_queue is nonempty.

I used a unique_lock rather than a plain lock_guard because the lock_guard is optimized for simplicity and does not offer the operations needed to unlock and relock the mutex.

I used [this] to enable the lambda to access the Sync_queue object (§11.4.3.3).

I return the value from get() through a reference argument, rather than as a return value, to be sure that an element type with a copy constructor that can throw will not cause trouble. That is the conventional technique (e.g., the STL stack adaptor provides pop() and the containers providefront()). Writing a general get() that directly returns a value is possible, but surprisingly tricky. For an example, see future<T>::get()42.4.4).

A simple producer-consumer pair can be very simple:

Sync_queue<Message> mq;

void producer()
{
while (true) {
Message m;
// ... fill m ...
mq.put(m);
}
}

void consumer()
{
while (true) {

Message m;
mq.get(m);
// ... use m ...
}
}
thread t1 {producer};
thread t2 {consumer};

Using a condition_variable saves a consumer the bother of explicitly dealing with the case where it runs out of work to do. Had we simply used a mutex to control access to the Sync_queue, the consumer would have had to repeatedly wake up, look for work on the queue, and decide what to do when it found the queue empty.

I copy values into and out of the list I use to hold my queue elements. A copy of an element type may throw an exception, but if it does, the Sync_queue will remain unchanged and the put() or get() simply fails.

A Sync_queue is not itself a shared data structure, so we don’t use a separate mutex for it; only the put() and get() (updating the head and the tail of the queue, which may be the same element) need to be protected against data races.

For some applications, the simple Sync_queue has a fatal flaw: What if a consumer waits forever because a producer stopped adding values? What if a consumer has other things to do so that it cannot wait for a long time? Often there are answers, but one common technique is to add a timeout to get(), that is, to specify a maximum time to wait:

void consumer()
{
while (true) {
Message m;
mq.get(m,milliseconds{200});
// ... use m ...
}
}

To make this work, we need to add a second get() to Sync_queue:

template<typename T>
void Sync_queue::get(T& val, steady_clock::duration d)
{
unique_lock<mutex> lck(mtx);
bool not_empty = cond.wait_for(lck,d,[this]{ return !q.empty(); });
if (not_empty) {
val=q.front();
q.pop_front();
}
else
throw system_error{"Sync_queue: get() timeout"};
}

When using a timeout, we need to consider what to do after the wait: did we get data or did we just time out? Actually, we don’t really care about the timeout, but only whether the predicate (expressed in the lambda) is true or not, so that is what wait_for() returns. I chose to report the failure of a get() with a timeout by throwing an exception. Had I thought that timing out would be a common and “nonexceptional” event, I would have returned a bool instead.

The roughly equivalent modification to put() would be to wait for the consumer to make inroads into a long queue, but not for too long a time:

template<typename T>
void Sync_queue::put(T val, steady_clock::duration d, int n)
{
unique_lock<mutex> lck(mtx);
bool not_full = cond.wait_for(lck,d,[this]{ return q.size()<n; });
if (not_full) {
q.push_back(val);
cond.notify_one();
}
else {
cond.notify_all();
throw system_error{"Sync_queue: put() timeout"};
}
}

For put(), the alternative of returning a bool to encourage the producer to always explicitly handle both cases seems more attractive than for get(). However, to avoid getting into a discussion of how best to handle overflow, I again chose to signal a failure by throwing an exception.

I chose to notify_all() if the queue was full. Maybe, some consumer needs a nudge to continue. The choice between notify_all() and notify_one() depends on the behavior of the application and is not always obvious. Notifying just one thread serializes access to the queue and could therefore minimize throughput when there are several potential consumers. On the other hand, notifying all waiting threads may wake up several threads, causing contention on the mutex and possibly having threads repeatedly waking up just to find the queue empty (emptied by otherthreads). I fall back on the old rule: Don’t trust your intuition; measure.

42.3.4.1. condition_variable_any

A condition_variable is optimized for unique_lock<mutex>. A condition_variable_any is functionally equivalent to a condition_variable but can use any lockable object for its operations:

Image

42.4. Task-Based Concurrency

So far, this chapter focused on the mechanisms for running concurrent tasks: the focus was on threads, avoiding race conditions, and synchronizing threads. For many concurrent tasks, I find this focus on mechanisms distracting from the real task (sic!) of specifying concurrent tasks. This section focuses on specifying a simple kind of task: a task that does one thing given arguments and produces one result.

To support this task-based model of concurrency, the standard library offers:

Image

The presentation of these facilities exposes many details that rarely need to bother an application writer. Please keep in mind the fundamental simplicity of the task model. Most of the more complicated details support rare uses, such as hiding uses of the messier threads-and-locks level.

The standard-library task support is just one example of what can be done to support task-based concurrency. Often, we would like to provide a lot of small tasks and let “the system” worry about how to map their execution onto hardware resources and how to keep them out of problems with data races, spurious wake-ups, excessive waits, etc.

The importance of these facilities is their simplicity to a programmer. In a sequential program, we usually write something like:

res = task(args); // perform a task given arguments and get the result

The concurrent version becomes:

auto handle = async(task,args); // perform a task given arguments
// ... do something else ...
res = handle.get() // get the result

Sometimes, we lose sight of the value of simplicity as we consider alternatives, details, performance, and tradeoffs. By default, use the simplest technique and reserve the more complex solutions for where you know that they are really worthwhile.

42.4.1. future and promise

As mentioned in §5.3.5, communication between tasks is handled by a future/promise pair. A task puts its result into a promise, and a task that needs the result retrieves the result from the corresponding future:

Image

The “value” in this diagram is technically known as the shared state (§iso.30.6.4). In addition to the return value or exception, it contains the information needed for two threads to safely exchange the information. At a minimum, a shared state must be able to hold:

• A value of the appropriate type or an exception. For a future “returning void” the value is nothing.

• A ready bit to indicate whether a value or exception is ready to be extracted by a future.

• The task to be executed when a get() is called for a future for a task launched by async() with the launch policy deferred42.4.6).

• A use count, so that the shared state can be destroyed when and only when its last potential user relinquishes access. In particular, if a stored value is of a class with a destructor, its destructor is called when the use count goes to zero.

• Some mutual exclusion data to enable unblocking of any thread that might be waiting (e.g., a condition_variable).

An implementation can take actions on a shared state:

Construct: Possibly using a user-supplied allocator.

Make ready: Set the “ready bit” and unblock any waiting threads.

Release: Decrease the use count and destroy the shared state if this was the last user.

Abandon: If it becomes impossible for a value or exception to be put into the shared state by a promise (e.g., because the promise is destroyed), a future_error exception with the error condition broken_promise is stored in the shared state and the shared state is made ready.

42.4.2. promise

A promise is the handle to a shared state (§42.4.1). It is where a task can deposit its result to be retrieved through a future42.4.4).

Image

Image

There are no copy operations for a promise.

A set function throws future_error if a value or exception is already set.

It is only possible to transmit a single result value through a promise. That may seem restrictive, but remember that the value is moved into and out of the shared state, rather than copied, so that we can cheaply pass a collection of objects. For example:

promise<map<string,int>> pr;
map<string,int>> m;
// ... fill m with a million <string,int> pairs ...
pr.set_value(m);

A task may then extract that map from a corresponding future at essentially zero cost.

42.4.3. packaged_task

A packaged_task holds a task and a future/promise pair.

Image

We pass a task (a function or a function object) that we want executed to a packaged_task. When our task executes a return x, it causes a set_value(x) on the packaged_task’s promise. Similarly, a throw x causes a set_exception(px) where px is an exception_ptr for x. Basically, thepackaged_task executes its task, f(args), like this:

try {
pr.set_value(f(args)); // assume that the promise is called pr
}
catch(...) {
pr.set_exception(current_exception());
}

A packaged_task offers a fairly conventional set of operations:

Image

A packaged_task can be moved but not copied. A packaged_task may copy its task, and a copy of a task is assumed to yield the same result as the original. This is important because a task may be moved with its packaged_task onto the stack of a new thread.

To abandon a shared state (as is done by the destructor and the move) means making it ready. If there is no value or exception stored, a pointer to a future_error is stored (§42.4.1).

The advantage of make_ready_at_exit() is that the result is not available until destructors for thread_local variables have been executed.

There is no get_promise() operation to match get_future(). The use of the promise is completely handled by the packaged_task.

For a really simple example we don’t even need any threads. First define a simple task:

int ff(int i)
{
if (i) return i;
throw runtime_error("ff(0)");
}

We can now package this function into packaged_tasks and call them:

packaged_task<int(int)> pt1 {ff}; // store ff in pt1
packaged_task<int(int)> pt2 {ff}; // store ff in pt2

pt1(1); // let pt1 call ff(1);
pt2(0); // let pt2 call ff(0);

So far, nothing appears to have happened. In particular, we don’t see the exception triggered by ff(0). In fact, pt1(1) did a set_value(1) on the promise attached to pt1, and pt1(0) did a set_exception(px) on the promise attached to pt2; that px is an exception_ptr to aruntime_error("ff(0)").

Later, we can try to retrieve the results. The get_future() operation is used to get hold of the future into which the packaged thread will deposit the result of its task.

auto v1 = pt1.get_future();
auto v2 = pt2.get_future();

try {
cout << v1.get() << '\n'; // will print
cout << v2.get() << '\n'; // will throw
}
catch (exception& e) {
cout << "exception: " << e.what() << '\n';
}

The output is:

1
exception: ff(0)

We could have gotten exactly the same effect by simply writing:

try {
cout << ff(1) << '\n'; // will print
cout << ff(0) << '\n'; // will throw
}
catch (exception& e) {
cout << "exception: " << e.what() << '\n';
}

The point is that the packaged_task version works exactly like the version using ordinary function calls even when the calls of the task (here ff) and the calls of the get()s are in different threads. We can concentrate on specifying the tasks, rather than thinking about threads and locks.

We can move the future, the packaged_task, or both around. Eventually, the packaged_task is invoked and its task deposits its result in the future without having to know either which thread executed it or which thread will receive the result. This is simple and general.

Consider a thread that processes a series of requests. It could be a GUI thread, a thread owning access to a piece of specialized hardware, or indeed any server that serializes access to a resource through a queue. We can implement such a service as a queue of messages (§42.3.4), or we could pass tasks to be executed:

using Res = /* result type for server*/;
using Args = /*
argument types for server */;
using PTT = Res(Args);

Sync_queue<packaged_task<PTT>> server;

Res f(Args); // function: do something
struct G {
Res operator()(Args); // function object: do something
// ...
};
auto h = [=](Args a) { /* do something */ }; // lambda

packaged_task<PTT> job1(f);
packaged_task<PTT> job2(G{});
packaged_task<PTT> job3(h);

auto f1 = job1.get_future();
auto f2 = job2.get_future();
auto f3 = job3.get_future();

server.put(move(job1));
server.put(move(job2));
server.put(move(job3));

auto r1 = f1.get();
auto r2 = f2.get();
auto r3 = f3.get();

The server thread would take the packaged_tasks from the server queue and execute them in some suitable order. Typically, the tasks would carry data with them from the calling context.

The tasks are written essentially like ordinary functions, function objects, and lambdas. The server calls the tasks essentially like ordinary (callback) functions. The packaged_tasks are actually easier for the server to use than ordinary functions because the handling of their exceptions has been taken care of.

42.4.4. future

A future is a handle to a shared state (§42.4.1). It is where a task can retrieve a result deposited by a promise42.4.2).

Image

Image

A future holds a unique value and offers no copy operations.

The value, if any, is moved out of a future. So get() can only be called once. If you potentially need to read a result several times (e.g., by different tasks), use a shared_future42.4.5).

It is undefined what happens if you try to get() twice. In fact, it is undefined what happens if you try to do any operation except a first get(), a valid(), or a destructor on a future that is not valid(). The standard “encourages” an implementation to throw a future_error with the error condition future_errc::no_state in such cases.

If a future<T>’s value type, T, is void or a reference, special rules apply for get():

future<void>::get() doesn’t return a value: it just returns or throws an exception.

future<T&>::get() returns a T&. A reference isn’t an object, so the library must have transmitted something else, such as a T*, and get() converts that (back) into a T&.

The status of a future can be observed by calling wait_for() and wait_until():

Image

The possible errors from operations on futures are:

Image

In addition, an operation on the T value of shared_future<T>::get() could possibly throw (e.g., an unusual move operation).

Looking at the future<T> table, I find that I miss two useful functions:

wait_for_all(args): Wait until every future in args has a value.

wait_for_any(args): Wait until one future in args has a value.

I can easily implement a version of wait_for_all():

template<typename T>
vector<T> wait_for_all(vector<future<T>>& vf)
{
vector<T> res;
for (auto& fu : vf)
res.push_back(fu.get());
return res;
}

That’s easy enough to use, but it has a flaw: if I wait for ten futures, I risk my thread getting blocked ten times. Ideally, my thread would be blocked and unblocked at most once. However, for many uses, this wait_for_all() implementation is good enough: if some of the tasks are long-running, the extra waits will not be significant. On the other hand, if all tasks are short, they will most likely have finished after the first wait.

An implementation of wait_for_any() is trickier. First we need a way of checking if a future is ready. Surprisingly, that is done by using wait_for(). For example:

future_status s = fu.wait_for(seconds{0});

Using wait_for(seconds{0}) to get the status of a future is not obvious, but wait_for() will tell us why it resumed, and it tests for ready before suspending. It is common, but unfortunately not guaranteed, that wait_for(seconds{0}) returns immediately rather than trying to suspend for zero time.

Given wait_for(), we can write:

template<typename T>
int wait_for_any(vector<future<T>>& vf, steady_clock::duration d)
// return index of ready future
// if no future is ready, wait for d before trying again
{
while(true) {
for (int i=0; i!=vf.size(); ++i) {
if (!vf[i].valid()) continue;
switch (vf[i].wait_for(seconds{0})) {
case future_status::ready:
return i;
case future_status::timeout:
break;
case future_status::deferred:
throw runtime_error("wait_for_all(): deferred future");
}
}
this_thread::sleep_for(d);
}
}

I decided to consider a deferred task (§42.4.6) an error for my uses.

Note the check for valid(). Trying wait_for() on an invalid future (e.g., a future on which you have already done a get()) will cause a hard-to-find error. At best, you can hope for a (probably surprising) exception being thrown.

Like the implementation of wait_for_all(), this implementation has a flaw: ideally, the caller of wait_for_any() should never have to wake up just to find that no tasks had completed and should be unblocked immediately when one does. This simple implementation only approximates that. With a large d a useless wake-up is unlikely but implies the possibility of an unnecessarily long wait.

The wait_for_all() and wait_for_any() functions are useful building blocks for concurrent algorithms. I use them in §42.4.6.

42.4.5. shared_future

The result value of a future can be read only once: it is moved. Thus, if you want to read the value repeatedly or potentially have it read by multiple readers, you must copy it, and then read the copy. That’s what a shared_future does. Every usable shared_future is directly or indirectly initialized by moving the value out of a future with the same result type.

Image

Obviously, shared_future is very similar to future. The key difference is that a shared_future moves its value to a location where it can be repeatedly read and shared. As for future<T>, special rules apply for get() when a shared_future<T>’s value type, T, is void or a reference:

shared_future<void>::get() doesn’t return a value: it just returns or throws an exception.

shared_future<T&>::get() returns a T&. A reference isn’t an object, so the library must have transmitted something else, such as a T*, and get() converts that (back) into a T&.

shared_future<T>::get() returns a const T& when T is not a reference.

Unless the returned object is a reference, it is const, so it can safely be accessed from several threads without synchronization. If the returned object is a non-const reference, you need some form of mutual exclusion to avoid data races on the referred-to object.

42.4.6. async()

Given future and promise42.4.1) and packaged_task42.4.3), we can write simple tasks without worrying too much about threads. Given those, a thread is just something you give a task to run. However, we still need to consider how many threads to use and whether a task is best run on the current thread or on another. Such decisions can be delegated to a thread launcher, that is, a function that decides whether to create a new thread, to recycle an old thread, or simply run the task on the current thread.

Image

The async() function is basically a simple interface to a launcher of unknown sophistication. A call of async() returns a future<R> where R is the type of its task’s result. For example:

double square(int i) { return i*i; }

future<double> fd = async(square,2);
double d = fd.get();

If a thread is launched to execute square(2), we may have a record slow way of executing 2*2. The notation can be simplified by the use of auto:

double square(int i) { return i* i; }

auto fd = async(square,2);
auto d = fd.get();

In principle, a caller of async() could provide a huge variety of information to help the implementation of async() decide whether to launch a new thread, rather than simply executing the task on the current thread. For example, we can easily imagine a programmer wanting to give the launcher a hint about how long a task is likely to run. However, only two policies are currently standard:

Image

Note the as if. The launcher has wide discretionary powers when it comes to launching a new thread or not. For example, since the default policy is async|deferred (async or deferred), it is not too fanciful to imagine an async() that decided to use deferred for async(square,2), so that the execution reduced to fd.get() calling square(2). I could even imagine an optimizer reducing that whole code fragment to

double d = 4;

However, we should not expect an implementation of async() to be optimized for such trivial examples. Implementer efforts are better spent on realistic examples where the task performs a significant amount of computation so that launching on a new or “recycled” thread could be reasonably considered.

By a “recycled thread” I mean a thread from a collection of threads (a thread pool) that async() may create once and use over and over to execute a variety of tasks. Depending on the implementation of system threads, this can drastically lower the cost of executing a task on a thread. If a thread is recycled, the launcher must take care that a task does not see leftover state from a previous task executed on the thread and that a task does not store pointers to its stack or thread_local data (§42.2.8) in nonlocal storage. Such data could conceivably be used for security violations.

A simple and realistic use of async() would be to spawn a task to collect input from a user:

void user()
{
auto handle = async([](){ return input_interaction_manager(); });
// ...
auto input = handle.get();
// ...
}

Such a task often requires some data from the caller. I used a lambda to make it obvious that I can pass arguments or allow access to local variables. When using a lambda to specify a task, beware of capturing local variables by reference. That could lead to data races or unfortunate cache access patterns by two threads accessing the same stack frame. Also, beware that capturing members of an object using [this]11.4.3.3) implies that the members of the object are accessed indirectly (through this), rather than copied, so that the object is subject to data races unless you make certain that it is not. If in doubt, copy (pass or capture by value, [=]).

It is often important that we can select a scheduling policy “late” and change it as needed. For example, I might use launch::deferred for initial debugging. That would eliminate errors related to concurrency until I had eliminated sequential errors. Also, I can often go back tolaunch::deferred to determine if an error really is related to concurrency.

Over time, more launch policies may become available, and maybe some systems offer better launch policies than others. In such cases, I might be able to improve the performance of my code by a local change of launch policy, rather than by reworking subtle details of the program logic. This, again, is an effect of the fundamental simplicity of the task-based model (§42.4).

Having launch::async|launch::deferred as the default launch policy can be a practical problem. Basically, it is not so much a default as a lacking design decision. An implementation might decide that “no concurrency” is a good idea and always use launch::deferred. If your experiments with concurrency show results surprisingly similar to single-thread execution, try being explicit about the launch policy.

42.4.7. A Parallel find() Example

A find() does a linear search of a sequence. Imagine having millions of items that are not easily sorted so that find() is the right algorithm for finding something. This could be slow, so instead of searching once starting at the beginning and going until the end, we might start 100 find()s each on a hundredth of the data.

First, we represent the data as a vector of Records:

extern vector<Record> goods; // data to be searched

An individual (sequential) task is simply a use of the standard-library find_if():

template<typename Pred>
Record*find_rec(vector<Record>& vr, int first, int last, Pred pr)
{
vector<Record>::iterator p = std::find_if(vr.begin()+first,vr.begin()+last,pr);
if (p == vr.begin()+last)
return nullptr; // at end: no record found
return &*p; // found: return a pointer to the element
}

Unfortunately, we have to decide on a “grain” of parallelism. That is, we need to specify the number of records to be searched sequentially.

const int grain = 50000; // number of records for a linear search

Picking a number like that is a very primitive way of choosing a grain size. It is hard to choose well unless a lot is known about the hardware, the library implementation, the data, and the algorithm. Experimentation is essential. Tools and frameworks that save us from having to pick a grain size or help us choose can be most useful. However, for a simple illustration of basic standard-library facilities and the most basic techniques for their use, grain is sufficient.

The pfind() (“parallel find”) function simply does the number of async() calls required by the grain and the number of Records. Then, it get()s the results:

template<typename Pred>
Record * pfind(vector<Record>& vr, Pred pr)
{
assert(vr.size()%grain==0);

vector<future<Record*>> res;

for (int i = 0; i!=vr.size(); i+=grain)
res.push_back(async(find_rec<Pred>,ref(vr),i,i+grain,pr));

for (int i = 0; i!=res.size(); ++i) // look for a result in the futures
if (auto p = res[i].get()) // did the task find a match?
return p;

return nullptr; // no match found
}

Finally, we can initiate a search:

void find_cheap_red()
{
assert(goods.size()%grain==0);

Record* p = pfind(goods,
[](Record& r) { return r.price<200 && r.color==Color::red; });
cout << "record "<< *p << '\n';
}

This first version of a parallel find() first spawns a lot of tasks and then proceeds to wait for them in order. Like std::find_if(), it reports the first element that matches the predicate; that is, it finds the element with the lowest index that matches. That may be fine, but:

• We could end up waiting for a lot of tasks that don’t find anything (maybe only the last task finds something).

• We may throw away a lot of information that could be useful (maybe a thousand items match our criteria).

The first problem may not be as bad as it sounds. Assume (somewhat recklessly) that launching a thread doesn’t cost anything and that we have as many processing units as there are tasks; then we would still get the result in roughly the time it took to process one task. That is, we would potentially get our result in the time taken to examine 50,000 records rather than millions. If we have N processing units, the results will be delivered in batches of results for N*50000 records. If no record is found until the last segment of the vector, the time will be roughlyvr.size()/(N*grain) units.

Instead of waiting for each task in order, we could try to look at the results in the order the tasks completed. That is, we could use wait_for_any()42.4.4). For example:

template<typename Pred>
Record*pfind_any(vector<Record>& vr, Pred pr)
{
vector<future<Record*>> res;

for (int i = 0; i!=vr.size(); i+=grain)
res.push_back(async(find_rec<Pred>,ref(vr),i,i+grain,pr));

for (int count = res.size(); count; ––count) {
int i = wait_for_any(res,microseconds{10}); // find a completed task
if (auto p = res[i].get()) // did the task find a match?
return p;
}

return nullptr; // no match found
}

A get() renders its future invalid, so we don’t get to look at a partial result twice.

I use count to make sure I don’t keep looking after all tasks have reported back. Apart from that, pfind_any() is as simple as pfind(). Whether there is a performance advantage to pfind_any() over pfind() depends on lots of things, but the key observation is that to (potentially) gain advantages of concurrency, we had to use a slightly different algorithm. Like find_if(), pfind() returns its first match, whereas pfind_any() returns whichever match it first found. Often, the best parallel algorithm for a problem is a variant of the idea for a sequential solution, rather than a simple repetition of the sequential solution.

In this case, the obvious question is “But do you really only need one match?” Given concurrency, it makes more sense to find all matches. Doing so is easy. All we need to do is to let each task return a vector of matches, rather than just a simple match:

template<typename Pred>
vector<Record*> find_all_rec(vector<Record>& vr, int first, int last, Pred pr)
{
vector<Record*> res;
for (int i=first; i!=last; ++i)
if (pr(vr[i]))
res.push_back(&vr[i]);
return res;
}

This find_all_rec() is arguably simpler than the original find_rec().

Now we just need to launch find_all_rec() a suitable number of times and wait for the results:

template<typename Pred>
vector<Record*> pfind_all(vector<Record>& vr, Pred pr)
{

vector<future<vector<Record*>>> res;

for (int i = 0; i!=vr.size(); i+=grain)
res.push_back(async(find_all_rec<Pred>,ref(vr),i,i+grain,pr));

vector<vector<Record*>> r2 = wait_for_all(res);

vector<Record*>r;
for (auto& x : r2) // merge results
for (auto p : x)
r.push_back(p);
return r;
}

Had I just returned a vector<vector<Record*>> this pfind_all() would have been the simplest parallelizing function so far. However, by merging the vectors returned into a single one, pfind_all() became an example of a common and popular group of parallel algorithms:

[1] Create a number of tasks to be run.

[2] Run the tasks in parallel.

[3] Merge the results.

This is the basic idea that, when developed into a framework so that the details of concurrent execution have been completely hidden, is commonly referred to as map-reduce [Dean,2004].

The example can be run like this:

void find_all_cheap_red()
{
assert(goods.size()%grain==0);

auto vp = pfind_all(goods,
[](Record& r) { return r.price<200 && r.color==Color::red; });
for (auto p : vp)
cout << "record "<< *p << '\n';
}

Finally, we must consider if the effort to parallelize was worthwhile. To do so, I added simple sequential versions to my test:

void just_find_cheap_red()
{
auto p = find_if(goods.begin(),goods.end(),
[](Record& r) { return r.price<200 && r.color==Color::red; });
if (p!=goods.end())
cout << "record "<< *p << '\n';
else
cout << "not found\n";
}

void just_find_all_cheap_red()
{
auto vp = find_all_rec(goods,0,goods.size(),
[](Record& r) { return r.price<200 && r.color==Color::red; });
for (auto p : vp)
cout << "record "<< *p << '\n';
}

For my simple test data and my (relatively) simple laptop with only four hardware threads, I did not find any consistent or significant performance differences. In this case, the cost of thread creation in the immature implementation of async() dominates the effects of concurrency. If I needed significant parallel speedup right now, I would implement my own variant of async() based on a pre-created set of threads and a work queue, along the lines of a Sync_queue42.3.4) of packaged_tasks (§42.4.3). Note that such a significant optimization can be done without changing my task-based parallel find() program. From the application’s point of view, replacing the standard-library async() with an optimized version is an implementation detail.

42.5. Advice

[1] A thread is a type-safe interface to a system thread; §42.2.

[2] Do not destroy a running thread; §42.2.2.

[3] Use join() to wait for a thread to complete; §42.2.4.

[4] Consider using a guarded_thread to provide RAII for threads; §42.2.4.

[5] Do not detach() a thread unless you absolutely have to; §42.2.4.

[6] Use lock_guard or unique_lock to manage mutexes; §42.3.1.4.

[7] Use lock() to acquire multiple locks; §42.3.2.

[8] Use condition_variables to manage communication among threads; §42.3.4.

[9] Think in terms of tasks that can be executed concurrently, rather than directly in terms of threads; §42.4.

[10] Value simplicity; §42.4.

[11] Return a result using a promise and get a result from a future; §42.4.1.

[12] Don’t set_value() or set_exception() to a promise twice; §42.4.2.

[13] Use packaged_tasks to handle exceptions thrown by tasks and to arrange for value return; §42.4.3.

[14] Use a packaged_task and a future to express a request to an external service and wait for its response; §42.4.3.

[15] Don’t get() twice from a future; §42.4.4.

[16] Use async() to launch simple tasks; §42.4.6.

[17] Picking a good granularity of concurrent tasks is difficult: experiment and measure; §42.4.7.

[18] Whenever possible, hide concurrency behind the interface of a parallel algorithm; §42.4.7.

[19] A parallel algorithm may be semantically different from a sequential solution to the same problem (e.g., pfind_all() vs. find()); §42.4.7.

[20] Sometimes, a sequential solution is simpler and faster than a concurrent solution; §42.4.7.