Fibers, Threads, and Processes - Facets of Ruby - Programming Ruby 1.9 & 2.0: The Pragmatic Programmers’ Guide (2013)

Programming Ruby 1.9 & 2.0: The Pragmatic Programmers’ Guide (2013)

Part 1. Facets of Ruby

Chapter 12. Fibers, Threads, and Processes

Ruby gives you two basic ways to organize your program so that you can run different parts of it apparently “at the same time.” Fibers let you suspend execution of one part of your program and run some other part. For more decoupled execution, you can split up cooperating tasks within the program, using multiple threads, or you can split up tasks between different programs, using multiple processes. Let’s look at each in turn.

12.1 Fibers

Ruby 1.9 introduced fibers . Although the name suggests some kind of lightweight thread, Ruby’s fibers are really just a very simple coroutine mechanism. They let you write programs that look like you are using threads without incurring any of the complexity inherent in threading. Let’s look at a simple example. We’d like to analyze a text file, counting the occurrence of each word. We could do this (without using fibers) in a simple loop:

counts = Hash.new(0)

File.foreach("testfile") do |line|

line.scan(/\w+/) do |word|

word = word.downcase

counts[word] += 1

end

end

counts.keys.sort.each {|k| print "#{k}:#{counts[k]} "}

Produces:

and:1 is:3 line:3 on:1 one:1 so:1 this:3 three:1 two:1

However, this code is messy—it mixes word finding with word counting. We could fix this by writing a method that reads the file and yields each successive word. But fibers give us a simpler solution:

words = Fiber.new do

File.foreach("testfile") do |line|

line.scan(/\w+/) do |word|

Fiber.yield word.downcase

end

end

nil

end

counts = Hash.new(0)

while word = words.resume

counts[word] += 1

end

counts.keys.sort.each {|k| print "#{k}:#{counts[k]} "}

Produces:

and:1 is:3 line:3 on:1 one:1 so:1 this:3 three:1 two:1

The constructor for the Fiber class takes a block and returns a fiber object. For now, the code in the block is not executed.

Subsequently, we can call resume on the fiber object. This causes the block to start execution. The file is opened, and the scan method starts extracting individual words. However, at this point, Fiber.yield is invoked. This suspends execution of the block—the resume method that we called to run the block returns any value given to Fiber.yield.

Our main program enters the body of the loop and increments the count for the first word returned by the fiber. It then loops back up to the top of the while loop, which again calls words.resume while evaluating the condition. The resume call goes back into the block, continuing just after it left off (at the line after the Fiber.yield call).

When the fiber runs out of words in the file, the foreach block exits, and the code in the fiber terminates. Just as with a method, the return value of the fiber will be the value of the last expression evaluated (in this case the nil).[56] The next time resume is called, it returns this value nil. You’ll get a FiberError if you attempt to call resume again after this.

Fibers are often used to generate values from infinite sequences on demand. Here’s a fiber that returns successive integers divisible by 2 and not divisible by 3:

twos = Fiber.new do

num = 2

loop do

Fiber.yield(num) unless num % 3 == 0

num += 2

end

end

10.times { print twos.resume, " " }

Produces:

2 4 8 10 14 16 20 22 26 28

Because fibers are just objects, you can pass them around, store them in variables, and so on. Fibers can be resumed only in the thread that created them.

Ruby 2.0 adds a new twist to this—you can now use lazy enumerators to gracefully handle infinite lists. These are described Lazy Enumerators in Ruby 2.«2.0»

Fibers, Coroutines, and Continuations

The basic fiber support in Ruby is limited—fibers can yield control only back to the code that resumed them. However, Ruby comes with two standard libraries that extend this behavior. The fiber library (described in the library section) adds full coroutine support. Once it is loaded, fibers gain a transfer method, allowing them to transfer control to arbitrary other fibers.

A related but more general mechanism is the continuation . A continuation is a way of recording the state of your running program (where it is, the current binding, and so on) and then resuming from that state at some point in the future. You can use continuations to implement coroutines (and other new control structures). Continuations have also been used to store the state of a running web application between requests—a continuation is created when the application sends a response to the browser; then, when the next request arrives from that browser, the continuation is invoked, and the application continues from where it left off. You enable continuations in Ruby by requiring the continuation library, described in the library section.

12.2 Multithreading

Often the simplest way to do two things at once is to use Ruby threads . Prior to Ruby 1.9, these were implemented as green threads—threads were switched within the interpreter. In Ruby 1.9, threading is now performed by the operating system. This is an improvement, but not quite as big an improvement as you might want. Although threads can now take advantage of multiple processors (and multiple cores in a single processor), there’s a major catch. Many Ruby extension libraries are not thread safe (because they were written for the old threading model). So, Ruby compromises: it uses native operating system threads but operates only a single thread at a time. You’ll never see two threads in the same application running Ruby code truly concurrently. (You will, however, see threads busy doing, say, I/O while another thread executes Ruby code. That’s part of the point.)

Creating Ruby Threads

Creating a new thread is pretty straightforward. The code that follows is a simple example. It downloads a set of web pages in parallel. For each URL that it is asked to download, the code creates a separate thread that handles the HTTP transaction.

require 'net/http'

pages = %w( www.rubycentral.org slashdot.org www.google.com )

threads = pages.map do |page_to_fetch|

Thread.new(page_to_fetch) do |url|

http = Net::HTTP.new(url, 80)

print "Fetching: #{url}\n"

resp = http.get('/')

print "Got #{url}: #{resp.message}\n"

end

end

threads.each {|thr| thr.join }

Produces:

Fetching: www.rubycentral.org

Fetching: slashdot.org

Fetching: www.google.com

Got www.google.com: OK

Got slashdot.org: OK

Got www.rubycentral.org: OK

Let’s look at this code in more detail, because a few subtle things are happening.

New threads are created with the Thread.new call. It is given a block that contains the code to be run in a new thread. In our case, the block uses the net/http library to fetch the top page from each of our nominated sites. Our tracing clearly shows that these fetches are going on in parallel.

When we create the thread, we pass the required URL as a parameter. This parameter is passed to the block as url. Why do we do this, rather than simply using the value of the variable page_to_fetch within the block?

A thread shares all global, instance, and local variables that are in existence at the time the thread starts. As anyone with a kid brother can tell you, sharing isn’t always a good thing. In this case, all three threads would share the variable page_to_fetch. The first thread gets started, and page_to_fetch is set to "www.rubycentral.org". In the meantime, the loop creating the threads is still running. The second time around, page_to_fetch gets set to "slashdot.org". If the first thread has not yet finished using the page_to_fetch variable, it will suddenly start using this new value. These kinds of bugs are difficult to track down.

However, local variables created within a thread’s block are truly local to that thread—each thread will have its own copy of these variables. In our case, the variable url will be set at the time the thread is created, and each thread will have its own copy of the page address. You can pass any number of arguments into the block via Thread.new.

This code also illustrates a gotcha. Inside the loop, the threads use print to write out the messages, rather than puts . Why? Well, behind the scenes, puts splits its work into two chunks: it writes its argument, and then it writes a newline. Between these two, a thread could get scheduled, and the output would be interleaved. Calling print with a single string that already contains the newline gets around the problem.

Manipulating Threads

Another subtlety occurs on the last line in our download program. Why do we call join on each of the threads we created?

When a Ruby program terminates, all threads are killed, regardless of their states. However, you can wait for a particular thread to finish by calling that thread’s Thread#join method. The calling thread will block until the given thread is finished. By calling join on each of the requester threads, you can make sure that all three requests have completed before you terminate the main program. If you don’t want to block forever, you can give join a timeout parameter—if the timeout expires before the thread terminates, the join call returns nil. Another variant of join , the methodThread#value, returns the value of the last statement executed by the thread.

In addition to join , a few other handy routines are used to manipulate threads. The current thread is always accessible using Thread.current. You can obtain a list of all threads using Thread.list, which returns a list of all Thread objects that are runnable or stopped. To determine the status of a particular thread, you can use Thread#status and Thread#alive?.

You can adjust the priority of a thread using Thread#priority=. Higher-priority threads will run before lower-priority threads. We’ll talk more about thread scheduling, and stopping and starting threads, in just a bit.

Thread Variables

A thread can normally access any variables that are in scope when the thread is created. Variables local to the block containing the thread code are local to the thread and are not shared. But what if you need per-thread variables that can be accessed by other threads—including the main thread? Class Thread has a facility that allows thread-local variables to be created and accessed by name. You simply treat the thread object as if it were a Hash, writing to elements using []= and reading them back using [] . In the example that follows, each thread records the current value of the variable count in a thread-local variable with the key mycount. To do this, the code uses the symbol :mycount when indexing thread objects. (A race condition [57] exists in this code, but we haven’t talked about synchronization yet, so we’ll just quietly ignore it for now.)

count = 0

threads = 10.times.map do |i|

Thread.new do

sleep(rand(0.1))

Thread.current[:mycount] = count

count += 1

end

end

threads.each {|t| t.join; print t[:mycount], ", " }

puts "count = #{count}"

Produces:

7, 0, 6, 8, 4, 5, 1, 9, 2, 3, count = 10

The main thread waits for the subthreads to finish and then prints that thread’s value of count. Just to make it interesting, each thread waits a random time before recording the value.

Threads and Exceptions

What happens if a thread raises an unhandled exception depends on the setting of the abort_on_exception flag (documented in the reference) and on the setting of the interpreter’s $DEBUG flag (described in the Ruby options section).

If abort_on_exception is false and the debug flag is not enabled (the default condition), an unhandled exception simply kills the current thread—all the rest continue to run. In fact, you don’t even hear about the exception until you issue a join on the thread that raised it. In the following example, thread 1 blows up and fails to produce any output. However, you can still see the trace from the other threads.

threads = 4.times.map do |number|

Thread.new(number) do |i|

raise "Boom!" if i == 1

print "#{i}\n"

end

end

puts "Waiting"

sleep 0.1

puts "Done"

Produces:

0

2

Waiting

3

Done

You normally don’t sleep waiting for threads to terminate—you’d use join . If you join to a thread that has raised an exception, then that exception will be raised in the thread that does the joining:

threads = 4.times.map do |number|

Thread.new(number) do |i|

raise "Boom!" if i == 1

print "#{i}\n"

end

end

puts "Waiting"

threads.each do |t|

begin

t.join

rescue RuntimeError => e

puts "Failed: #{e.message}"

end

end

puts "Done"

Produces:

0

Waiting

2

3

Failed: Boom!

Done

However, set abort_on_exception to true or use -d to turn on the debug flag, and an unhandled exception kills the main thread, so the message Done never appears. (This is different from Ruby 1.8, where the exception killed all running threads.)

Thread.abort_on_exception = true

threads = 4.times.map do |number|

Thread.new(number) do |i|

raise "Boom!" if i == 1

print "#{i}\n"

end

end

puts "Waiting"

threads.each {|t| t.join }

puts "Done"

Produces:

0

2

prog.rb:4:in `block (2 levels) in <main>': Boom! (RuntimeError)

12.3 Controlling the Thread Scheduler

In a well-designed application, you’ll normally just let threads do their thing; building timing dependencies into a multithreaded application is generally considered to be bad form, because it makes the code far more complex and also prevents the thread scheduler from optimizing the execution of your program.

The Thread class provides a number of methods that control the scheduler. Invoking Thread.stop stops the current thread, and invoking Thread#run arranges for a particular thread to be run. Thread.pass deschedules the current thread, allowing others to run, and Thread#join and #value suspend the calling thread until a given thread finishes. These last two are the only low-level thread control methods that the average program should use. In fact, we now consider most of the other low-level thread control methods too dangerous to use correctly in programs we write.[58] Fortunately, Ruby has support for higher-level thread synchronization.

12.4 Mutual Exclusion

Let’s start by looking at a simple example of a race condition—multiple threads updating a shared variable:

sum = 0

threads = 10.times.map do

Thread.new do

100_000.times do

new_value = sum + 1

print "#{new_value} " if new_value % 250_000 == 0

sum = new_value

end

end

end

threads.each(&:join)

puts "\nsum = #{sum}"

Produces:

250000 250000 250000 250000 250000 500000 500000

sum = 599999

We create 10 threads, and each increments the shared sum variable 100,000 times. And yet, when the threads all finish, the final value in sum is considerably less than 1,000,000. Clearly we have a race condition. The reason is the print call that sits between the code that calculates the new value and the code that stores it back into sum. In one thread, the updated value gets calculated—let’s say that the value of sum is 99,999, so new_value will be 100,000. Before storing the new value back into sum, we call print , and that causes another thread to be scheduled (because we’re waiting for the I/O to complete). So a second thread also fetches the value of 99,999 and increments it. It stores 100,000 into sum, then loops around again and stores 100,001, and 100,002, and so on. Eventually the original thread continues running because it finished writing its message. It immediate stores it’s value of 100,000 into the sum, overwriting (and losing) all the values stored by the other thread(s). We lost data.

Fortunately, that’s easy to fix. We use the built-in class Mutex to create synchronized regions—areas of code that only one thread may enter at a time.

Some grade schools coordinate students’ access to the bathrooms during class time using a system of bathroom passes. Each room has two passes, one for girls and one for boys. To visit the bathroom, you have to take the appropriate pass with you. If someone else already has that pass, you have to cross your legs and wait for them to return. The bathroom pass controls access to the critical resource—you have to own the pass to use the resource, and only one person can own it at a time.

A mutex is like that bathroom pass. You create a mutex to control access to a resource and then lock it when you want to use that resource. If no one else has it locked, your thread continues to run. If someone else has already locked that particular mutex, your thread suspends (crossing its legs) until they unlock it.

Here’s a version of our counting code that uses a mutex to ensure that only one thread updates the count at a time:

sum = 0

mutex = Mutex.new

threads = 10.times.map do

Thread.new do

100_000.times do

mutex.lock #### one at a time, please

new_value = sum + 1 #

print "#{new_value} " if new_value % 250_000 == 0

sum = new_value #

mutex.unlock ####

end

end

end

threads.each(&:join)

puts "\nsum = #{sum}"

Produces:

250000 500000 750000 1000000

sum = 1000000

This pattern is so common that the Mutex class provides Mutex#synchronize, which locks the mutex, runs the code in a block, and then unlocks the mutex. This also ensures that the mutex will get unlocked even if an exception is thrown while it is locked.

sum = 0

mutex = Mutex.new

threads = 10.times.map do

Thread.new do

100_000.times do

mutex.synchronize do ####

new_value = sum + 1 #

print "#{new_value} " if new_value % 250_000 == 0

sum = new_value #

end ####

end

end

end

threads.each(&:join)

puts "\nsum = #{sum}"

Produces:

250000 500000 750000 1000000

sum = 1000000

Sometimes you want to claim a lock if a mutex is currently unlocked, but you don’t want to suspend the current thread if it isn’t. The Mutex#try_lock method takes the lock if it can, but returns false if the lock is already taken. The following code illustrates a hypothetical currency converter. The ExchangeRates class caches rates from an online feed, and a background thread updates that cache once an hour. This update takes a minute or so. In the main thread, we interact with our user. However, rather than just go dead if we can’t claim the mutex that protects the rate object, we use try_lock and print a status message if the update is in process.

rate_mutex = Mutex.new

exchange_rates = ExchangeRates.new

exchange_rates.update_from_online_feed

Thread.new do

loop do

sleep 3600

rate_mutex.synchronize do

exchange_rates.update_from_online_feed

end

end

end

loop do

print "Enter currency code and amount: "

line = gets

if rate_mutex.try_lock

puts(exchange_rates.convert(line)) ensure rate_mutex.unlock

else

puts "Sorry, rates being updated. Try again in a minute"

end

end

If you are holding the lock on a mutex and you want to temporarily unlock it, allowing others to use it, you can call Mutex#sleep. We could use this to rewrite the previous example:

rate_mutex = Mutex.new

exchange_rates = ExchangeRates.new

exchange_rates.update_from_online_feed

Thread.new do

rate_mutex.lock

loop do

rate_mutex.sleep 3600

exchange_rates.update_from_online_feed

end

end

loop do

print "Enter currency code and amount: "

line = gets

if rate_mutex.try_lock

puts(exchange_rates.convert(line)) ensure rate_mutex.unlock

else

puts "Sorry, rates being updated. Try again in a minute"

end

end

Queues and Condition Variables

Most of the examples in this chapter use the Mutex class for synchronization. However, Ruby comes with another library that is particularly useful when you need to synchronize work between producers and consumers. The Queue class, located in the thread library, implements a thread-safe queuing mechanism. Multiple threads can add and remove objects from each queue, and each addition and removal is guaranteed to be atomic. For an example, see the description of the thread library.

A condition variable is a controlled way of communicating an event (or a condition) between two threads. One thread can wait on the condition, and the other can signal it. The thread library extends threads with condition variables. Again, see the Monitor library for an example.

12.5 Running Multiple Processes

Sometimes you may want to split a task into several process-sized chunks—maybe to take advantage of all those cores in your shiny new processor. Or perhaps you need to run a separate process that was not written in Ruby. Not a problem: Ruby has a number of methods by which you may spawn and manage separate processes.

Spawning New Processes

You have several ways to spawn a separate process; the easiest is to run some command and wait for it to complete. You may find yourself doing this to run some separate command or retrieve data from the host system. Ruby does this for you with the system and backquote (or backtick) methods:

system("tar xzf test.tgz") # => true

`date` # => "Mon May 27 12:31:17 CDT 2013\n"

The method Object#system executes the given command in a subprocess; it returns true if the command was found and executed properly. It raises an exception if the command cannot be found. It returns false if the command ran but returned an error. In case of failure, you’ll find the subprocess’s exit code in the global variable $?.

One problem with system is that the command’s output will simply go to the same destination as your program’s output, which may not be what you want. To capture the standard output of a subprocess, you can use the backquote characters, as with ‘date‘ in the previous example. Remember that you may need to use String#chomp to remove the line-ending characters from the result.

OK, this is fine for simple cases—we can run some other process and get the return status. But many times we need a bit more control than that. We’d like to carry on a conversation with the subprocess, possibly sending it data and possibly getting some back. The method IO.popen does just this. The popen method runs a command as a subprocess and connects that subprocess’s standard input and standard output to a Ruby IO object. Write to the IO object, and the subprocess can read it on standard input. Whatever the subprocess writes is available in the Ruby program by reading from the IO object.

For example, on our systems one of the more useful utilities is pig, a program that reads words from standard input and prints them in pig latin (or igpay atinlay). We can use this when our Ruby programs need to send us output that our five-year-olds shouldn’t be able to understand:

pig = IO.popen("local/util/pig", "w+")

pig.puts "ice cream after they go to bed"

pig.close_write

puts pig.gets

Produces:

iceway eamcray afterway eythay ogay otay edbay

This example illustrates both the apparent simplicity and the more subtle real-world complexities involved in driving subprocesses through pipes. The code certainly looks simple enough: open the pipe, write a phrase, and read back the response. But it turns out that the pig program doesn’t flush the output it writes. Our original attempt at this example, which had a pig.puts followed by a pig.gets, hung forever. The pig program processed our input, but its response was never written to the pipe. We had to insert the pig.close_write line. This sends an end-of-file to pig’s standard input, and the output we’re looking for gets flushed as pig terminates.

popen has one more twist. If the command you pass it is a single minus sign (-), popen will fork a new Ruby interpreter. Both this and the original interpreter will continue running by returning from the popen . The original process will receive an IO object back, and the child will receive nil. This works only on operating systems that support the fork(2) call (and for now this excludes Windows).

pipe = IO.popen("-","w+")

if pipe

pipe.puts "Get a job!"

STDERR.puts "Child says '#{pipe.gets.chomp}'"

else

STDERR.puts "Dad says '#{gets.chomp}'"

puts "OK"

end

Produces:

Dad says 'Get a job!'

Child says 'OK'

As well as the popen method, some platforms support Object#fork, Object#exec, and IO.pipe. The filenaming convention of many IO methods and Object#open will also spawn subprocesses if you put a | as the first character of the filename (see the introduction to class IO for details). Note that you cannot create pipes using File.new; it’s just for files.

Independent Children

Sometimes we don’t need to be quite so hands-on; we’d like to give the subprocess its assignment and then go on about our business. Later, we’ll check to see whether it has finished. For instance, we may want to kick off a long-running external sort:

exec("sort testfile > output.txt") if fork.nil?

# The sort is now running in a child process

# carry on processing in the main program

# ... dum di dum ...

# then wait for the sort to finish

Process.wait

The call to Object#fork returns a process ID in the parent and returns nil in the child, so the child process will perform the Object#exec call and run sort. Later, we issue a Process.wait call, which waits for the sort to complete (and returns its process ID).

If you’d rather be notified when a child exits (instead of just waiting around), you can set up a signal handler using Object#trap (described in the reference). Here we set up a trap on SIGCLD, which is the signal sent on “death of child process”:

trap("CLD") do

pid = Process.wait

puts "Child pid #{pid}: terminated"

end

fork { exec("sort testfile > output.txt") }

# Do other stuff...

Produces:

Child pid 22026: terminated

For more information on using and controlling external processes, see the documentation for Object#open and IO.popen, as well as the section on the Process module.

Blocks and Subprocesses

IO.popen works with a block in pretty much the same way as File.open does. If you pass it a command, such as date, the block will be passed an IO object as a parameter:

IO.popen("date") {|f| puts "Date is #{f.gets}" }

Produces:

Date is Mon May 27 12:31:17 CDT 2013

The IO object will be closed automatically when the code block exits, just as it is with File.open.

If you associate a block with fork , the code in the block will be run in a Ruby subprocess, and the parent will continue after the block:

fork do

puts "In child, pid = #$$"

exit 99

end

pid = Process.wait

puts "Child terminated, pid = #{pid}, status = #{$?.exitstatus}"

Produces:

In child, pid = 22033

Child terminated, pid = 22033, status = 99

$? is a global variable that contains information on the termination of a subprocess. See the section on Process::Status for more information.

Footnotes

[56]

In fact, the nil is not strictly needed, as foreach will return nil when it terminates. The nil just makes it explicit.

[57]

A race condition occurs when two or more pieces of code (or hardware) both try to access some shared resource, and the outcome changes depending on the order in which they do so. In the example here, it is possible for one thread to set the value of its mycount variable to count, but before it gets a chance to increment count, the thread gets descheduled and another thread reuses the same value of count. These issues are fixed by synchronizing the access to shared resources (such as the count variable).

[58]

And, worse, some of these primitives are unsafe in use. Charles Nutter of JRuby fame has a blog post that illustrates one problem: http://blog.headius.com/2008/02/rubys-threadraise-threadkill-timeoutrb.html .