Threads and Concurrency - THE RUBY WAY, Third Edition (2015)

THE RUBY WAY, Third Edition (2015)

Chapter 13. Threads and Concurrency

He draweth out the thread of his argument finer than the staple of his verbosity.

—William Shakespeare,
Love’s Labours Lost, Act V, Sc. 1

One of the wonders of modern computers is that they appear to be able to do more than one thing at a time. Even back in the days of single-core processes, the lone CPU could switch between its various jobs quickly enough that it appeared to be doing many things at once. With the advent of inexpensive computers with multiple CPUs, that illusion has become reality. Thus, we take it for granted that we can simultaneously download a gem while we surf the Internet and print a document. This concurrency is exposed to programmers in two fundamental forms: processes and threads. In general, a process is a fairly self-sufficient entity, with its own flow of control and its own address space, safely out of reach of other processes (with important exceptions—an operating system must not only protect processes from each other but also provide mechanisms for sharing memory).

Threads are sometimes called lightweight processes. They are a way to achieve concurrency without all the overhead of switching tasks at the operating system level. Although the computing community is not in perfect agreement about the definition of threads, we’ll use the most common definition here.

A thread generally lives inside a process, sharing its address space and open files. The good news about threads is that because they do share so much with their parent process, they are relatively cheap to create. The bad news about threads is that because they share so much with their parent process—and any other threads within that process—they have a tendency to interfere with each other.

Threads are useful in circumstances where separate pieces of code naturally function independently of each other. They are also useful when an application spends much of its time waiting for an event. Often while one thread is waiting, another can be doing useful processing. On the other hand, there are some potential disadvantages in the use of threads. Switching between threads can reduce the speed at which each individual task runs. In some cases, access to a resource is inherently serialized so that threading doesn’t help. Sometimes, the overhead of synchronizing access to resources exceeds the savings due to multithreading.

Threads always incur some kind of performance hit, both in memory and in execution time; it is easy to pass the point of diminishing returns. For these and other reasons, some authorities claim that threaded programming is to be avoided. Indeed, concurrent code can be complex, error prone, and difficult to debug. But with so many CPU cores available in modern computers, the increase in speed for concurrent tasks can be enormous. We leave it to the reader to decide when it is worthwhile to use these techniques.

The difficulties associated with unsynchronized threads are well known. A data structure can be corrupted by threads attempting simultaneous access to the data. Race conditions may occur wherein one thread makes some assumption about what another has done already; these commonly result in “nondeterministic” code that may run differently with each execution. Finally, there is the danger of deadlock, wherein no thread can continue because it is waiting for a resource held by some other thread, which is also blocked. Code written to avoid these problems is referred to asthread-safe code.

Using threads in Ruby presents some additional challenges. The main version of Ruby (MRI, Matz’s Ruby Interpreter) creates threads in the usual way. But—and this is a huge but—it still contains legacy code that is not thread-safe. As a result, it only allows one thread to run at a time. The result is low overhead, but very low concurrency. Multiple threads are only useful in MRI when a program will spend time waiting for I/O.

The story is different if you happen to be using the Rubinius or JRuby interpreters. JRuby is the implementation of Ruby for the Java Virtual Machine, while Rubinius is an experimental interpreter that attempts to implement Ruby using as much Ruby as possible. JRuby’s threads rely on the threading supplied by the underlying Java platform, while Rubinius uses operating system level threads like MRI does. Unlike MRI, both Rubinius and JRuby do not artificially synchronize their threads. This means that in Ruby interpreters other than MRI, threads can run at the same time, using all of the processors available on the machine.

A big part of understanding threads is knowing the synchronization methods that allow you to control access to variables and resources, protect critical sections of code, and avoid deadlock. We will examine each of these techniques and illustrate them with code.

13.1 Creating and Manipulating Threads

The most basic operations on threads include creating a thread, passing information in and out, waiting for a thread, and stopping a thread. We can also obtain lists of threads, check the state of a thread, and check various other information.

13.1.1 Creating Threads

Every program you have ever run had at least one thread associated with it—the initial flow of control that we call the “main” thread. Multithreading happens when we start other threads.

Creating a thread in Ruby is easy. Simply call the new method and attach a block that will be the body of the thread:

thread = Thread.new do
# Statements comprising
# the thread...
end

The method fork is an alias for new, so we could have created our thread by calling Thread.fork instead of Thread.new. No matter which name you use, the value returned is an instance of Thread. You can use the Thread instance to control the thread.

If you want to pass parameters into your thread, you can do so by passing them into Thread.new:

thread2 = Thread.new(99, 100) do |a, b|
# a is a parameter that starts out equal to 99
# b is also a parameter which starts out at 100
end

Thus, the thread block in the preceding example takes two parameters, a and b, which start out as 99 and 100, respectively.

Because threads are built around ordinary code blocks, they can also access variables from the scope in which they were created. Therefore, the following thread can change the values of x and y because those two variables were in scope when we created the block associated with the thread:

x = 1
y = 2

thread3 = Thread.new(99, 100) do |a, b|
# x and y are visible and can be changed!
x = 10
y = 11
end

puts x, y

The danger in doing this sort of thing lies in the question of when. When exactly do the values of x and y change? We can say that they will change sometime after thread3 starts, but that is pretty much all we can say. Once a thread starts, it takes on a life of its own, and unless you do something to synchronize with it, you will have no idea of how fast it will run. Because of this unpredictability, it’s hard to say what the last example will print out. If thread3 finishes very rapidly, we might well see 10 and 11 printed. Conversely, if thread3 is slow off the mark, we might see 1 and 2 printed. There is even a small chance that we might see a 1 followed by an 11 printed if the race between thread3 and the main thread is very close. In real programs, this is exactly the kind of random behavior we would like to avoid.

13.1.2 Accessing Thread-Local Variables

So how do you go about building threads that can share results with the rest of the program without stepping all over variables from outside their scope? Fortunately, Ruby provides us with a special mechanism for this purpose. The trick is that you can treat a thread instance like a hash, setting and getting values by key. This thread-local data can be accessed from both inside and outside of the thread and provide a convenient sharing mechanism between the two. Here is an example of thread-local data in action:

thread = Thread.new do
t = Thread.current
t[:var1] = "This is a string"
t[:var2] = 365
end

sleep 1 # Let the thread spin up

# Access the thread-local data from outside...

x = thread[:var1] # "This is a string"
y = thread[:var2] # 365

has_var2 = thread.key?(:var2) # true
has_var3 = thread.key?(:var3) # false

As you can see from the code, a thread can set its own local data by calling Thread.current to get its own Thread instance and then treating that object like a hash. Thread instances also have a convenient key? instance method that will tell you if a particular key is set. One other nice thing about thread-local data is that it hangs around even after the thread has expired.

Note that thread-local data is not the same as the local variables inside the thread block. The local variable var3 and thread[:var3] may look somewhat alike, but they are definitely not the same:

thread = Thread.new do
t = Thread.current
t["var3"] = "thread local!!"
var3 = "a regular local"
end

sleep 1 # Let the thread spin up

a = thread[:var3] # "thread local!!"

You should also keep in mind that although threads with their thread-local data do act a bit like hashes, they are definitely not the real thing: Threads lack most of the familiar Enumerable methods such as each and find. More significantly, threads are picky about the keys you are allowed to use for the thread-local data. Besides symbols, your only other choice for a thread local key is a string, and if you do use a string, it will get silently converted to a symbol. We can see all of this going on here:

thread = Thread.new do
t = Thread.current
t["started_as_a_string"] = 100
t[:started_as_a_symbol] = 101
end

sleep 1 # Let the thread spin up

a = thread[:started_as_a_string] # 100
b = thread["started_as_a_symbol"] # 101

It’s important to note that while thread-local data can help solve the where problem by giving you a thread-specific place to put data, thread-local data alone doesn’t help with the when problem: Absent some kind of synchronization, you will have no idea when a thread will get around to doing something with its thread-local data. The common case is to “return” thread-local data as the thread finishes executing. No synchronization is needed then because the thread has terminated.

13.1.3 Querying and Changing Thread Status

The Thread class sports a number of useful class methods for managing threads. For example, the list method returns an array of all living threads, whereas the main method returns a reference to the main thread, the one that kicks everything off. And as you have seen, there is also acurrent method that allows a thread to find its own identity.

t1 = Thread.new { sleep 100 }

t2 = Thread.new do
if Thread.current == Thread.main
puts "This is the main thread." # Does NOT print
end
1.upto(1000) { sleep 0.1 }
end

count = Thread.list.size # 3

if Thread.list.include?(Thread.main)
puts "Main thread is alive." # Always prints!
end

if Thread.current == Thread.main
puts "I'm the main thread." # Prints here...
end

The exit, pass, start, stop, and kill methods are used to control the execution of threads, either from inside or outside:

# In the main thread...
Thread.kill(t1) # Kill thread t1 from the previous example
Thread.pass # Give up my timeslice
t3 = Thread.new do
sleep 20
Thread.exit # Exit the current thread
puts "Can't happen!" # Never reached
end

Thread.kill(t2) # Now kill t2

# Now exit the main thread (killing any others)
Thread.exit

Note that there is no instance method stop, so a thread can stop itself but not another thread.

There are also various methods for checking the state of a thread. The instance method alive? will tell whether the thread is still “living” (not exited), and stop? will return true if the thread is either dead or sleeping. Note that alive? and stop? are not really opposites: Both will return true if the thread is sleeping because a sleeping thread is a) still alive and b) not doing anything at the moment:

count = 0
t1 = Thread.new { loop { count += 1 } }
t2 = Thread.new { Thread.stop }

sleep 1 # Let the threads spin up

flags = [t1.alive?, # true
t1.stop?, # false
t2.alive?, # true
t2.stop?] # true

To get a complete picture of where a thread is in its life cycle, use the status method. The possible return values of status are something of a hodgepodge: If the thread is currently running, status will return the string "run"; you will get "sleep" if the thread is stopped, sleeping, or waiting on I/O. If the thread terminated normally, you will get false back from status. Alternatively, status will return nil if the thread died horribly with an exception. You can see all of this in the next example:

t1 = Thread.new { loop {} }
t2 = Thread.new { sleep 5 }
t3 = Thread.new { Thread.stop }
t4 = Thread.new { Thread.exit }
t5 = Thread.new { raise "exception" }

sleep 1 # Let the threads spin up

s1 = t1.status # "run"
s2 = t2.status # "sleep"
s3 = t3.status # "sleep"
s4 = t4.status # false
s5 = t5.status # nil

Threads are also aware of the $SAFE global variable; but when it comes to threads, $SAFE is not quite as global as it seems because each thread effectively has its own. This does seem inconsistent, but we probably shouldn’t complain; this allows us to have threads run with different levels of safety. Because $SAFE isn’t terribly global when it comes to threads, Thread instances have a safe_level method that returns the safe level for that particular thread:

t1 = Thread.new { $SAFE = 1; sleep 5 }
t2 = Thread.new { $SAFE = 3; sleep 5 }
sleep 1
level0 = Thread.main.safe_level # 0
level1 = t1.safe_level # 1
level2 = t2.safe_level # 3

Ruby threads also have a numeric priority: Threads with a higher priority will be scheduled more often. You can look at and change the priority of a thread with the priority accessor:

t1 = Thread.new { loop { sleep 1 } }
t2 = Thread.new { loop { sleep 1 } }
t2.priority = 3 # Set t2 at priority 3
p1 = t1.priority # 0
p2 = t2.priority # 3

The special method pass is used when a thread wants to yield control to the scheduler. A thread that calls pass merely yields its current timeslice; it doesn’t actually stop or go to sleep:

t1 = Thread.new do
Thread.pass
puts "First thread"
end

t2 = Thread.new do
puts "Second thread"
end

sleep 3 # Give the threads a change to run.

In this contrived example, we are more likely to get see the second thread print before the first. If we take out the call to pass, we are a bit more likely to see the first thread—which started first—win the race. Of course, when it comes to unsynchronized threads, there are simply no guarantees. The pass method and thread priorities are there to provide hints to the scheduler, not to keep your threads synchronized.

A thread that is stopped may be awakened by use of the run or wakeup method:

t1 = Thread.new do
Thread.stop
puts "There is an emerald here."
end

t2 = Thread.new do
Thread.stop
puts "You're at Y2."
end

sleep 0.5 # Let the threads start

t1.wakeup
t2.run

sleep 0.5 # Give t2 a chance to run

The difference between these is subtle. The wakeup call will change the state of the thread so that it is runnable but will not schedule it to be run; on the other hand, run will wake up the thread and schedule it for immediate running. When we ran this code, we got the following output:

You're at Y2.
There is an emerald here.

Again, depending on the exact timing and the whims of the thread scheduler, the output could have been reversed. In particular, you should not rely on stop, wakeup, and run to synchronize your threads.

The raise instance method will raise an exception in the thread specified as the receiver. The call does not have to originate within the thread:

factorial1000 = Thread.new do
begin
prod = 1
1.upto(1000) {|n| prod *= n }
puts "1000! = #{prod}"
rescue
# Do nothing...
end
end

sleep 0.01 # Your mileage may vary.
if factorial1000.alive?
factorial1000.raise("Stop!")
puts "Calculation was interrupted!"
else
puts "Calculation was successful."
end

The thread spawned in the preceding example tries to calculate the factorial of 1,000; if it doesn’t succeed within a hundredth of a second, the main thread will get impatient and kill it. Thus, on a relatively slow machine, this code fragment will print the message Calculation was interrupted!.

13.1.4 Achieving a Rendezvous (and Capturing a Return Value)

Sometimes one thread wants to wait for another thread to finish. The instance method join will accomplish this:

t1 = Thread.new { do_something_long }

do_something_brief
t1.join # Don't continue until t1 is done

A very common mistake when doing thread programming is to start some threads and then let the main thread exit. The trouble is that the main thread is special: Ruby terminates the entire process. For example, the following code fragment would never give us its final answer without thejoin at the end:

meaning_of_life = Thread.new do
puts "The answer is..."
sleep 2
puts 42
end

meaning_of_life.join # Wait for the thread to finish

Another useful little join idiom is to wait for all the other living threads to finish:

Thread.list.each { |t| t.join if t != Thread.current }

Note that we have to check that the thread is not the current thread before calling join: It is an error for any thread, even the main thread, to call join on itself.

If two threads try to join the other at the same time, Ruby will detect this deadlock situation and raise an exception:

thr = Thread.new { sleep 1; Thread.main.join }

thr.join # Deadlock results!

As you have seen, every thread has an associated block. Elementary Ruby knowledge tells us that a block can have a return value. This implies that a thread can return a value. The value method will implicitly perform a join operation and wait for the thread to complete; then it will return the value of the last evaluated expression in the thread:

max = 10000
t = Thread.new do
sleep 0.2 # Simulate some deep thought time
42
end

puts "The secret is #{t.value}"

13.1.5 Dealing with Exceptions

What happens if an exception occurs within a thread? Under normal circumstances, an exception inside a thread will not raise in the main thread. Here is an example of a thread raising an exception:

t1 = Thread.new do
puts "Hello"
sleep 1
raise "some exception"
end

t2 = Thread.new do
sleep 2
puts "Hello from the other thread"
end

sleep 3
puts "The End"

That example says hello from both threads, and the main thread announces the end.

Exceptions inside threads are not raised until the join or value method is called on that thread. It is up to some other thread to check on the thread that failed and report the failure. Here is an example of catching an exception raised inside a thread in a safe manner:

t1 = Thread.new do
raise "Oh no!"
puts "This will never print"
end

begin
t1.status # nil, indicating an exception occurred
t1.join
rescue => e
puts "Thread raised #{e.class}: #{e.message}"
end

The example will print the text “Thread raised RuntimeError: Oh no!” It is important to raise errors from threads at a known point so that they can be rescued or re-raised without interrupting critical sections of other threads.

While debugging threaded code, it can sometimes be helpful to use the abort_on_exception flag. When it is set to true (for a single thread or globally on the Thread class), uncaught exceptions will terminate all running threads.

Let’s try that initial example again with abort_on_exception set to true on thread t1:

t1 = Thread.new do
puts "Hello"
sleep 1
raise "some exception"
end

t1.abort_on_exception = true # Watch the fireworks!

t2 = Thread.new do
sleep 2
puts "Hello from the other thread"
end

sleep 3
puts "The End"

In this case, the only output you will see is a single Hello because the uncaught exception in thread t1 will abort both the main thread and t2.

Whereas t1.abort_on_exception only affects the behavior of the one thread, the class-level Thread.abort_on_exception will take down all the threads if any of them raises an uncaught exception.

Note that this option is only suitable for development or debugging because it is equivalent to Thread.list.each(&:kill). The kill method is (ironically) not thread-safe. Only use kill or abort_on_exception for debugging or to terminate threads that are definitely safe to abruptly kill. Aborted threads can hold a lock or be prevented from running clean-up code in an ensure block, which can leave your program in an unrecoverable state.

13.1.6 Using a Thread Group

A thread group is a way of managing threads that are logically related to each other. Normally all threads belong to the Default thread group (which is a class constant). However, you can create thread groups of your own and add threads to them. A thread can only be in one thread group at a time so that when a thread is added to a thread group, it is automatically removed from whatever group it was in previously.

The ThreadGroup.new class method will create a new thread group, and the add instance method will add a thread to the group:

t1 = Thread.new("file1") { sleep(1) }
t2 = Thread.new("file2") { sleep(2) }

threads = ThreadGroup.new
threads.add t1
threads.add t2

The instance method list will return an array of all the threads in the thread group:

# Count living threads in this_group
count = 0
this_group.list.each {|x| count += 1 if x.alive? }
if count < this_group.list.size
puts "Some threads in this_group are not living."
else
puts "All threads in this_group are alive."
end

ThreadGroup instances also feature the oddly named enclose method, which mostly prevents new threads from being added to the group:

tg = ThreadGroup.new
tg.enclose # Enclose the group
tg.add Thread.new {sleep 1} # Boom!

We say “mostly” because any new threads started from a thread already in an enclosed group will still be added to the group. Thread groups also have an enclosed? instance method that will return true if the group has in fact been enclosed.

There is plenty of room for useful methods to be added to ThreadGroup. The following example shows methods to wake up every thread in a group, to wait for all threads to catch up (via join), and to kill all threads in a group:

class ThreadGroup

def wakeup
list.each { |t| t.wakeup }
end

def join
list.each { |t| t.join if t != Thread.current }
end

def kill
list.each { |t| t.kill }
end

end

One thing to keep in mind about thread groups is that as threads die, they are silently removed from their group. So just because you stick threads in a group does not mean they will all be there ten minutes or ten seconds from now.

13.2 Synchronizing Threads

Why is synchronization necessary? It is because the “interleaving” of operations causes variables and other entities to be accessed in ways that are not obvious from reading the code of the individual threads. Two or more threads accessing the same variable may interact with each other in ways that are unforeseen and difficult to debug.

Let’s take this simple piece of code as an example:

def new_value(i)
i + 1
end

x = 0

t1 = Thread.new do
1.upto(1000000) { x = new_value(x) }
end

t2 = Thread.new do
1.upto(1000000) { x = new_value(x) }
end

t1.join
t2.join
puts x

In the example, we start by setting the variable x to zero. Then we start two threads; each thread increments x a million times using the new_value method. Finally, we print out the value of x. Logic tells us that x should be two million when it is printed out. However, sometimes running this code only results in one million!

When this is run on JRuby, the results are even more unexpected: 1143345 on one run, followed by 1077403 on the next and 1158422 on a third. Is this a terrible bug in Ruby?

Not really. Our code assumes that the incrementing of an integer is an atomic (or indivisible) operation. But it isn’t. Consider the logic flow in the following code example. We put thread t1 on the left side and t2 on the right. We put each separate timeslice on a separate line and assume that when we enter this piece of logic, x has the value 123:

t1 t2
__________________________ __________________________

Retrieve value of x (123)
Retrieve value of x (123)
Add one to value (124)
Add one to value (124)
Store 124 back in x
Store 124 back in x

It should be clear that each thread is doing a simple increment from its own point of view. But it can also been seen that, in this case, x is still 124 after having been incremented by both threads. We deliberately made things worse in our example by introducing the call to new_value, which increases the interval between retrieving the value of x and storing the incremented value. Even the relatively minuscule delay of a single function call is enough to dramatically decrease the number of times the value is incremented correctly.

MRI’s ability to sometimes produce the correct result might lead a casual observer to think this code is thread-safe. The problem is hidden as a side effect of MRI’s GIL, or Global Interpreter Lock, which ensures that only one thread can run at a time. As you just saw, even with the GIL, MRI suffers from the same synchronization problem and can still produce the wrong result.

The worse news is that this is only the simplest of synchronization problems. Complex multithreaded programs can be full of subtle “works most of the time” bugs, and the best way to handle these issues is still a hot topic of study by computer scientists and mathematicians.

13.2.1 Performing Simple Synchronization

The simplest form of synchronization is to use Thread.exclusive. The Thread.exclusive method defines a critical section of code: Whenever one thread is in a critical section, no other threads will run.

Using Thread.exclusive is easy: You simply pass in a block. Ruby will ensure that no other thread is running while the code in the block is executed. In the following code, we revisit the previous example and use the Thread.exclusive method to define the critical section and protect the sensitive parts of the code:

def new_value(i)
i + 1
end

x = 0

t1 = Thread.new do
1.upto(1000000) do
Thread.exclusive { x = new_value(x) }
end
end

t2 = Thread.new do
1.upto(1000000) do
Thread.exclusive { x = new_value(x) }
end
end

t1.join
t2.join
puts x

Although we haven’t changed the actual code very much, we have changed the flow of execution quite a bit. The calls to Thread.exclusive prevent our threads from stepping all over each other:

t1 t2
__________________________ __________________________

Retrieve value of x (123)
Add one to value (124)
Store 124 back in x
Retrieve value of x (124)
Add one to value (125)
Store 125 back in x

In practice, using Thread.exclusive presents a number of problems, the key one being just how expansive its effects are. Using Thread.exclusive means that you are blocking almost all other threads, including innocent ones that will never touch any of your precious data.

The almost touches on another problem: Although Thread.exclusive is expansive, it is not completely airtight. There are circumstances under which another thread can run even while a Thread.exclusive block is under way. For example, if you start a second thread inside your exclusive block, that thread will run.

For these reasons, Thread.exclusive is at its best in simple examples like the one in this section. Fortunately, Ruby has a number of other, more flexible and easily targeted thread-synchronization tools.

13.2.2 Synchronizing Access with a Mutex

One such tool is the mutex (short for mutual exclusion). To see Ruby’s mutex in action, let’s rework our last counting example to use one:

require 'thread'

def new_value(i)
i + 1
end

x = 0
mutex = Mutex.new

t1 = Thread.new do
1.upto(1000000) do
mutex.lock
x = new_value(x)
mutex.unlock
end
end

t2 = Thread.new do
1.upto(1000000) do
mutex.lock
x = new_value(x)
mutex.unlock
end
end

t1.join
t2.join
puts x

Instances of the Mutex class provide a sort of privately scoped synchronization: Mutex ensures that only one thread at a time can call lock. If two threads happen to call lock at about the same time, one of the threads will be suspended until the lucky thread—the one that got the lock—calls unlock. Because any given mutex instance only affects the threads that are trying to call lock for that instance, we can have our synchronization without making all but one of our threads come to a halt.

In addition to lock, the Mutex class also has a try_lock method. The try_lock method behaves just like lock, except that it doesn’t block: If another thread already has the lock, try_lock will return false immediately:

require 'thread'

mutex = Mutex.new
t1 = Thread.new do
mutex.lock
sleep 10
end

sleep 1

t2 = Thread.new do
if mutex.try_lock
puts "Locked it"
else
puts "Could not lock" # Prints immediately
end
end

This feature is useful any time a thread doesn’t want to be blocked. Mutex instances also have a synchronize method that takes a block:

x = 0
mutex = Mutex.new

t1 = Thread.new do
1.upto(1000000) do
mutex.synchronize { x = new_value(x) }
end
end

Finally, there is also a mutex_m library defining a Mutex_m module, which can be mixed into a class (or used to extend an object). Any such extended object has the mutex methods so that the object itself can be treated as a mutex:

require 'mutex_m'

class MyClass
include Mutex_m

# Now any MyClass object can call
# lock, unlock, synchronize, ...
# or external objects can invoke
# these methods on a MyClass object.
end

13.2.3 Using the Built-in Queue Classes

The thread library thread.rb has a couple of queueing classes that will be useful from time to time. The class Queue is a thread-aware queue that synchronizes access to the ends of the queue; that is, different threads can share the same queue without interfering with each other. The classSizedQueue is essentially the same, except that it allows a limit to be placed on the number of elements a given queue instance can hold.

Queue and SizedQueue have much the same set of methods available because SizedQueue actually inherits from Queue. The SizedQueue class also has the accessor max, used to get or set the maximum size of the queue:

buff = SizedQueue.new(25)
upper1 = buff.max # 25
# Now raise it...
buff.max = 50
upper2 = buff.max # 50

Listing 13.1 is a simple producer-consumer illustration. The consumer is delayed slightly longer on average (through a longer sleep) so that the items will “pile up” a little.

Listing 13.1 The Producer-Consumer Problem


require 'thread'

buffer = SizedQueue.new(2)

producer = Thread.new do
item = 0
loop do
sleep(rand * 0.1)
puts "Producer makes #{item}"
buffer.enq item
item += 1
end
end

consumer = Thread.new do
loop do
sleep((rand 0.1) + 0.09)
item = buffer.deq
puts "Consumer retrieves #{item}"
puts " waiting = #{buffer.num_waiting}"
end
end

sleep 10 # Run a 10 secs, then die and kill threads


The methods enq and deq are the recommended way to get items into and out of the queue. We can also use push to add to the queue and pop or shift to remove items, but these names have somewhat less mnemonic value when we are explicitly using a queue.

The method empty? will test for an empty queue, and clear will remove all items from a queue. The method size (or its alias length) will return the actual number of items in the queue:

require 'thread'

# Assume no other threads interfering...
buff = Queue.new
buff.enq "one"
buff.enq "two"
buff.enq "three"
n1 = buff.size # 3
flag1 = buff.empty? # false
buff.clear
n2 = buff.size # 0
flag2 = buff.empty? # true

The num_waiting method is the number of threads waiting to access the queue. In a plain-old Queue instance, this is the number of threads waiting to remove elements; with a SizedQueue, num_waiting also includes the threads waiting to add elements to the queue.

An optional parameter, non_block, defaults to false for the deq method in the Queue class. If it is true, an empty queue will give a ThreadError rather than block the thread.

13.2.4 Using Condition Variables

And he called for his fiddlers three.

—“Old King Cole” (traditional folk tune)

A condition variable is really just a queue of threads. It is used in conjunction with a mutex to provide a higher level of control when synchronizing threads. A condition variable allows you to relinquish control of the mutex until a certain condition has been met. Imagine a situation in which a thread has a mutex locked but cannot continue because the circumstances aren’t right. It can sleep on the condition variable and wait to be awakened when the condition is met.

It is important to understand that while a thread is waiting on a condition variable, the mutex is released so that other threads can gain access. It is also important to realize that when another thread does a signal operation (to awaken the waiting thread), the waiting thread reacquires the lock on the mutex.

Let’s look at a slightly tongue-in-cheek example in the tradition of the Dining Philosophers. Imagine a table where three violinists are seated, all of whom want to take turns playing. However, there are only two violins and only one bow. Obviously a violinist can play only if he has one of the violins and the lone bow at the same time.

We keep a count of the violins and bows available. When a player wants a violin or a bow, he must wait for it. In our code, we protect the test with a mutex and do separate waits for the violin and the bow, both associated with that mutex. If a violin or a bow is not available, the thread sleeps. It loses the mutex until it is awakened by another thread signaling that the resource is available, whereupon the original thread wakes up and once again owns the lock on the mutex. Listing 13.2 shows the code.

Listing 13.2 The Three Violinists


require 'thread'

@music = Mutex.new
@violin = ConditionVariable.new
@bow = ConditionVariable.new

@violins_free = 2
@bows_free = 1

def musician(n)
3.times do
sleep rand
@music.synchronize do
@violin.wait(@music) while @violins_free == 0
@violins_free -= 1
puts "#{n} has a violin"
puts "violins #@violins_free, bows #@bows_free"

@bow.wait(@music) while @bows_free == 0
@bows_free -= 1
puts "#{n} has a bow"
puts "violins #@violins_free, bows #@bows_free"
end

sleep rand
puts "#{n}: (...playing...)"
sleep rand
puts "#{n}: Now I've finished."

@music.synchronize do
@violins_free += 1
@violin.signal if @violins_free == 1
@bows_free += 1
@bow.signal if @bows_free == 1
end
end
end

threads = []
3.times {|i| threads << Thread.new { musician(i) } }

threads.each {|t| t.join }


We believe that this solution will never deadlock, though we’ve found it difficult to prove. However, it is interesting to note that this algorithm is not necessarily a fair one. In our tests, the first player always got to play more often than the other two, and the second more often than the third. The cause and cure for this behavior are left, as they say, as an exercise for the reader.

13.2.5 Other Synchronization Techniques

Yet another synchronization mechanism is the monitor, implemented in Ruby in the form of monitor.rb. This technique is somewhat more advanced than the mutex; notably you can nest monitor locks.

Like the Spanish Inquisition, nested locks are mostly unexpected. No one, for example, would ever write the following:

@mutex = Mutex.new

@mutex.synchronize do
@mutex.synchronize do
#...
end
end

But that doesn’t mean that a nested lock can’t happen. What if the call to synchronized lives in a recursive method? Or suppose one method grabs the mutex and then innocently calls another method:

def some_method
@mutex = Mutex.new

@mutex.synchronize do
#...
some_other_method
end
end

def some_other_method
@mutex.synchronize do # Deadlock!
#...
end
end

The answer—at least when you are using a mutex—is deadlock followed by an ugly exception. Because monitors do allow nested locks, this code will run happily:

def some_method
@monitor = Monitor.new

@monitor.synchronize do
#...
some_other_method
end
end

def some_other_method
@monitor.synchronize do # No problem!!
#...
end
end

Like the mutex, Ruby’s monitors come in two flavors: the class version, Monitor, which you have just met, and the very pedantically named MonitorMixin module. MonitorMixin allows you to use instances of any class as a monitor:

class MyMonitor
include MonitorMixin
end

The monitor.rb file also improves on the ConditionVariable class that comes with the standard thread. The monitor.rb version adds the wait_until and wait_while methods, which will block a thread based on a condition. It also allows a timeout while waiting because the wait method has a timeout parameter, which is a number of seconds (defaulting to nil).

Because we are rapidly running out of thread examples, Listing 13.3 presents a rewrite of the Queue and SizedQueue classes using the monitor technique. The code is based on Shugo Maeda’s work, used with permission.

Listing 13.3 Implementing a Queue with a Monitor


# Slightly modified version of code by Shugo Maeda.

require 'monitor'

class OurQueue
def initialize
@que = []
@monitor = Monitor.new
@empty_cond = @monitor.new_cond
end

def enq(obj)
@monitor.synchronize do
@que.push(obj)
@empty_cond.signal
end
end

def deq
@monitor.synchronize do
while @que.empty?
@empty_cond.wait
end
return @que.shift
end
end

def size
@que.size
end
end

class OurSizedQueue < OurQueue
attr :max

def initialize(max)
super()
@max = max
@full_cond = @monitor.new_cond
end

def enq(obj)
@monitor.synchronize do
while @que.length >= @max
@full_cond.wait
end
super(obj)
end
end

def deq
@monitor.synchronize do
obj = super
if @que.length < @max
@full_cond.signal
end
return obj
end
end

def max=(max)
@monitor.synchronize do
@max = max
@full_cond.broadcast
end
end
end


The sync.rb library is one more way of performing thread synchronization (using a two-phase lock with a counter). It defines a Sync_m module used in an include or an extend (much like Mutex_m). This module makes available methods such as sync_locked?, sync_shared?,sync_exclusive?, sync_lock, sync_unlock, and sync_try_lock.

13.2.6 Setting a Timeout for an Operation

Sometimes we want to set a maximum length of time for some code to run. This is most useful when interacting with remote computers over a network, as network requests may or may not ever complete.

The timeout library is a thread-based solution to this problem (see Listing 13.4). The timeout method executes the block associated with the method call; when the specified number of seconds has elapsed, it throws a Timeout::Error, which can be caught with a rescue clause.

Listing 13.4 A Timeout Example


require 'timeout'

flag = false
answer = nil

begin
timeout(5) do
puts "I want a cookie!"
answer = gets.chomp
flag = true
end
rescue TimeoutError
flag = false
end

if flag
if answer == "cookie"
puts "Thank you! Chomp, chomp, ..."
else
puts "That's not a cookie!"
exit
end
else
puts "Hey, too slow!"
exit
end

puts "Bye now..."


Timeouts in Ruby come with two important caveats, however. First, they are not thread-safe. The underlying mechanism of timeouts is to create a new thread, monitor that thread for completion, and forcibly kill the thread if it has not completed within the timeout specified. As mentioned in Section 13.1.5, “Dealing with Exceptions,” killing threads may not be a thread-safe operation, because it can leave your program in an unrecoverable state.

To create a timeout-like effect safely, write the processing thread such that it will periodically check if it needs to abort. This allows the thread to do any required cleanup while terminating in a controlled way. Here is an example that calculates as many prime numbers as possible in 5 seconds, without requiring an external thread to forcibly abort the computation:

require 'prime'
primes = []
generator = Prime.each
start = Time.now

while Time.now < (start + 5)
10.times { primes << generator.next }
end

puts "Ran for #{Time.now - start} seconds."
puts "Found #{primes.size} primes, ending in #{primes.last}"

In testing, this code printed “Ran for 5.005569 seconds. Found 2124010 primes, ending in 34603729.” Although your results may vary, this example demonstrates how to monitor for a timeout without having to forcibly kill an executing thread.

Second, keep in mind that the Timeout::Error exception will not be caught by a simple rescue with no arguments. Calling rescue with no arguments will automatically rescue any instance of StandardError and its subclasses, but Timeout::Error is not a subclass ofStandardError.

If you are in any situation that might raise a timeout error, be sure to rescue both types of errors. Note that this includes any usage of the Net::HTTP library, which uses the Timeout library to time out requests that take too long.

13.2.7 Waiting for an Event

In many situations, we might want to have one or more threads monitoring the “outside world” while other threads are doing other things. The examples here are all rather contrived, but they do illustrate the general principle.

In the following example, we see three threads doing the “work” of an application. Another thread simply wakes up every second, checks the global variable $flag, and wakes up two other threads when it sees the flag set. This saves the three worker threads from interacting directly with the two other threads and possibly making multiple attempts to awaken them:

$flag = false
work1 = Thread.new { job1() }
work2 = Thread.new { job2() }
work3 = Thread.new { job3() }

thread4 = Thread.new { Thread.stop; job4() }
thread5 = Thread.new { Thread.stop; job5() }

watcher = Thread.new do
loop do
sleep 1
if $flag
thread4.wakeup
thread5.wakeup
Thread.exit
end
end
end

If at any point during the execution of the job methods the variable $flag becomes true, thread4 and thread5 are guaranteed to start within a second. After that, the watcher thread terminates.

In this next example, we are waiting for a file to be created. We check every second for it, and start another thread if we see it; meanwhile, other threads can be doing anything at all. Actually, we are watching for three separate files here:

def process_file(filename)
puts "processing file #{filename}"
end

def waitfor(filename)
loop do
if File.exist? filename
puts "found #{filename}"
file_processor = Thread.new { process_file(filename) }
Thread.exit
else
sleep 1
end
end
end
waiter1 = Thread.new { waitfor("Godot") }
waiter2 = Thread.new { waitfor("Guffman") }
headwaiter = Thread.new { waitfor("head") }

# Main thread goes off to do other things...

There are many other situations in which a thread might wait for an outside event, such as a networked application where the server at the other end of a socket is slow or unreliable.

13.2.8 Collection Searching in Parallel

Threads also make it straightforward to work on a number of alternative solutions simultaneously. To see how this might work, let’s write some code to find the biggest number in a bunch of arrays in a limited amount of time. Our threaded_max method takes a time limit argument, as well as an array containing the arrays of numbers that are to be searched. It returns the biggest number it can find within the given time limit.

To get the most out of a limited time, our code doesn’t just search through all of the arrays one after another. Instead, it starts a bunch of threads, one for each array:

require 'thread'

def threaded_max(interval, collections)
threads = []

collections.each do |col|
threads << Thread.new do
me = Thread.current
me[:result] = col.first
col.each do |n|
me[:result] = n if n > me[:result]
end
end
end

sleep(interval)

threads.each {|t| t.kill }
results = threads.map {|t| t[:result]}

results.compact.max # Max be nil
end

collections = [
[ 1, 25, 3, 7, 42, 64, 55 ],
[ 3, 77, 1, 2, 3, 5, 7, 9, 11, 13, 102, 67, 2, 1],
[ 3, 33, 7, 44, 77, 92, 10, 11]]

biggest = threaded_max(0.5, collections)

A few of things to note about this code: First, to keep things simple, the code assumes that there is actually something in each array.

Second, the threads report their results back in a thread local value called :result. The threads update :result every step of the way because the main bit of the threaded_max method will only wait so long before it kills off all of the threads and returns the biggest value found so far. Finally, threaded_max can conceivably actually return nil: No matter how long we wait, it is possible that the threads will not have done anything before threaded_max runs out of patience. (Not very likely, but possible.)

Does using a bunch of threads actually make things faster? It’s hard to say. The answer probably depends on your operating system as well as on the number of arrays you are searching.

13.2.9 Recursive Deletion in Parallel

Just for fun, let’s write some code to delete an entire directory tree, and let’s make it concurrent. The trick to this example is that each time we come across a directory, we start a new thread to delete that directory and its contents.

As we go along, we keep track of the threads we’ve created in an array called threads; because this is a local variable, each thread will have its own copy of the array. It can be accessed by only one thread at a time, and there is no need to synchronize access to it.

Note also that we pass fullname into the thread block so that we don’t have to worry about the thread accessing a variable that is changing. The thread uses fn as a local copy of the same variable.

When we have traversed an entire directory, we want to wait on the threads we have created before deleting the directory we’ve just finished working on:

def delete_all(dir)
threads = []
Dir.foreach(dir) do |e|
next if [".",".."].include? e # Skip . and ..
fullname = dir + "/" + e
if FileTest.directory?(fullname)
threads << Thread.new(fullname) {|fn| delete_all(fn) }
else
File.delete(fullname)
puts "delete: #{fullname}"
end
end
threads.each { |t| t.join }
Dir.delete(dir)
puts "deleting dir #{dir}"
end

delete_all("/tmp/stuff")

Is this actually faster than the non-threaded version? We’ve found that the answer is not consistent. It probably depends on your operating system as well as on the actual directory structure being deleted—that is, its depth, size of files, and so on.

13.3 Fibers and Cooperative Multitasking

Along with full operating system threads, Ruby also provides fibers, which can be described as cut-down threads or as code blocks with superpowers.

Fibers do not create an OS thread, but they can contain a block of code that maintains its state, can be paused and resumed, and can yield results. To see what this means, let’s start by creating one with a call to Fiber.new:

fiber = Fiber.new do
x = 2
Fiber.yield x
x = x * 2
Fiber.yield x
x * 2
end

Fiber.new doesn’t actually cause any of the code in the fiber’s block to execute. To get things going, you need to call the resume method. Calling resume will cause the code inside of the block to run until it either hits the end of the block or Fiber.yield is called. Because our fiber has a Fiber.yield on the second line of the block, the initial call to resume will run until that second line. The call to resume returns whatever is passed to Fiber.yield so that our call to resume will return 2:

answer1 = fiber.resume # answer1 will be 2

So far this is all very lambda-like, but now things get interesting. If we call resume a second or third time, the fiber will pick up where it left off:

answer2 = fiber.resume # should be 4
answer3 = fiber.resume # should be 8

Note that if the execution hits the end of the block, fibers will do the usual block thing and return the last value computed in the block.

Given all this, you can look at a fiber as a restartable code block: Each time a fiber hits a Fiber.yield, the fiber remembers where it is and lets you restart from that point. You can keep restarting a fiber until it finishes running the code block; call resume after that and you will raise aFiberError.

As we say, a fiber is a sort of restartable code block. However, you can also look at a fiber as a sort of thread, a thread that you schedule by hand. Instead of running more or less continuously in the background, fibers run in the thread that calls resume, hanging onto it only until the fiber finishes or hits the next Fiber.yield. In other words, whereas threads implement preemptive multitasking, fibers implement cooperative multitasking.

Along with the basic Fiber.new and yield, you can get at some bonus fiber methods if you require the 'fiber' library. Doing so will give you access to Fiber.current, which returns the currently executing fiber instance. You also get the alive? instance method, which will tell you if a fiber is still alive. Finally, the fiber library will give every fiber instance the transfer instance method, which allows fibers to transfer control from one to the other (making fibers truly cooperative).

To fill out our look at fibers, here is the multiple-array-searching example rewritten to use fibers:

require 'fiber'

class MaxFiber < Fiber
attr_accessor :result
end

def max_by_fiber(interval, collections)
fibers = []
collections.each_with_index do |numbers|
fibers << MaxFiber.new do
me = Fiber.current
me.result = numbers[0]
numbers.each_with_index do |n, i|
me.result = n if n > me.result
Fiber.yield me.result if (i+1) % 3 == 0
end
me.result
end
end

start_time = Time.now
while fibers.any? &:alive?
break if Time.now - start_time > interval
fibers.each {|f| puts f.resume if f.alive? }
end

values = fibers.map &:result
values.compact.max || 0
end

collections = [
[ 1, 25, 3, 7, 42, 64, 55 ],
[ 3, 77],
[ 3, 33, 7, 44, 77, 102, 92, 10, 11],
[1, 2, 3, 4, 5, 6, 7, 8, 9, 77, 2, 3 ]]

biggest = max_by_fiber(0.5, collections)

Because we are using fibers instead of threads, there is no real parallelism going on here. Instead, the fibers just give us a nice way of keeping track of where we are as we do a round-robin search through the various arrays.

The power that fibers provide, of pausing execution until a later time, is most useful when it is useful to wait before calculating the next item in a succession. As you may have noticed, this is very similar to how Ruby’s Enumerator objects work. Perhaps unsurprisingly, every enumerator is in fact implemented using fibers. The next, take, and every other enumerator method simply calls resume on its fiber as many times as is needed to produce the requested results.

13.4 Conclusion

As you have seen in this chapter, Ruby provides support for threading via native OS threads. However, the GIL ensures only one thread can run at a time in MRI, whereas JRuby and Rubinius allow multiple threads to run at once.

This chapter gave you a hint of just how difficult multithreaded programming can be and how to use Ruby’s synchronization primitives, such as mutexes, monitors, condition variables, and queues, to keep your threads coordinated. Finally, we had a quick look at fibers, Ruby’s “almost-not-quite threads.”

In Chapter 14, “Scripting and System Administration,” we move away from a discussion of programming technique in itself to a more task-oriented topic. We’ll be discussing the use of Ruby for everyday scripting and system administration tasks.