Distributed Ruby - THE RUBY WAY, Third Edition (2015)

THE RUBY WAY, Third Edition (2015)

Chapter 20. Distributed Ruby

Less is more.

—Robert Browning, “Andrea del Sarto”

In the last 25 years, many technologies enabling distributed computing have been devised. These technologies include various flavors of RPC as well as such things as COM, CORBA, DCE, and Java’s RMI.

These all vary in complexity, but they do essentially the same thing. They provide relatively transparent communication between objects in a networking context so that remote objects can be used as though they were local.

Why would we want to do something like this in the first place? There might be many reasons. One excellent reason is to share the burden of a computing problem between many processors at once. An example would be the SETI@home program, which uses your PC to process small data sets in the “search for extraterrestrial intelligence.” (SETI@home is not a project of the SETI Institute, by the way.) Another example would be the grassroots effort to decode the RSA129 encryption challenge (which succeeded several years ago). There are countless other areas where it is possible to split a problem into individual parts for a distributed solution.

It’s also conceivable that you might want to expose an interface to a service without making the code itself available. This is frequently done via a web application, but the inherently stateless nature of the Web makes this a little unwieldy (in addition to other disadvantages). A distributed programming mechanism makes this kind of thing possible in a more direct way.

In the Ruby world, one answer to this challenge is drb or “distributed Ruby,” by Masatoshi Seki. (The name is also written DRb.) There are other ways of doing distributed coding with Ruby, but drb is arguably the easiest. It doesn’t have such advanced facilities as CORBA’s naming service, but it is a simple and usable library with all the most basic functionality you would need. In this chapter, we’ll look at the basics of using distributed Ruby (along with Rinda, which is built on top of it).

20.1 An Overview: Using drb

A drb application has two basic components—a server and a client. A rough breakdown of their responsibilities follows.

The server has the following responsibilities:

• Starts a TCPServer and listens on a port.

• Binds an object to the drb server instance.

• Accepts connections from clients and responds to their messages.

• May optionally provide access control (security).

And the client has these responsibilities:

• Establishes a connection to the server process.

• Binds a local object to the remote server object.

• Sends messages to the server object and gets responses.

The class method start_service takes care of starting a TCP server that listens on a specified port; it takes two parameters. The first is a URI (Universal Resource Identifier) specifying a port (if it is nil, a port will be chosen dynamically). The second is an object to which we want to bind. This object will be remotely accessible by the client, invoking its methods as though it were local:

require "drb"

myobj = MyServer.new
DRb.start_service("druby://:1234", myobj) # Port 1234

# ...

If the port is chosen dynamically, the class method uri can be used to retrieve the full URI, including the port number:

DRb.start_service(nil, myobj)
myURI = DRb.uri # "druby://hal9000:2001"

Because drb is threaded, any server application will need to do a join on the server thread (to prevent the application from exiting prematurely and killing the thread):

# Prevent premature exit
DRb.thread.join

On the client side, we can invoke start_service with no parameters and use DRbObject to create a local object that corresponds to the remote one. We typically use nil as the first parameter in creating a new DRbObject:

require "drb"

DRb.start_service
obj = DRbObject.new(nil, "druby://hal9000:2001")

# Messages passed to obj will get forwarded to the
# remote object on the server side...

We should point out that on the server side, when we bind to an object, we really are binding to a single object, which will answer all requests that it receives. If there is more than one client, we will have to make our code thread-safe to avoid that object somehow getting into an inconsistent state. (For really simple or specialized applications, this may not be necessary.)

We can’t go into great detail here. Just be aware that if a client both reads and writes the internal state of the remote object, then two or more clients have the potential to interfere with each other. To avoid this, we recommend a straightforward solution using a synchronization mechanism such as a Mutex. (Refer to Chapter 13, “Threads and Concurrency,” for more on threads and synchronization issues.)

Let’s look at security at least a little. After all, you may not want just any old client to connect to your server. You can’t prevent clients from trying, but you can prevent their succeeding.

Distributed Ruby has the concept of an access control list (or ACL, often pronounced to rhyme with “crackle”). These are simply lists of clients (or categories of clients) that are specifically allowed (or not allowed) to connect.

Here is an example: We use the ACL class to create a new ACL, passing in one or two parameters. The second (optional) parameter to ACL.new answers the question, “Do we deny all clients except certain ones, or allow all clients except certain ones?” The default is DENY_ALLOW, represented by a 0, whereas ALLOW_DENY is represented by a 1.

The first parameter for ACL.new is simply an array of strings; these strings are taken in pairs, where the first in the pair is “deny” or “allow,” and the second represents a client or category of clients (by name or address). The following is an example:

require "drb/acl"
acl = ACL.new( %w[ deny all
allow 192.168.0.*
allow 210.251.121.214
allow localhost] )

The first entry, deny all, is arguably redundant, but it does make the meaning more explicit.

Now how do we use an ACL? The install_acl method will put an ACL into effect for us. Note that this has to be done before the call to the start_service method, or it will have no effect:

# Continuing the above example...

DRb.install_acl(acl)
DRb.start_service(nil, some_object)
# ...

When the service then starts, any unauthorized client connection will result in a RuntimeError being thrown by the server.

There is somewhat more to drb than we cover here, but this is enough for a good overview. In the next section, we’ll look at a simple drb server and client that are almost real-world code. We’ll also look at Rinda and Ring before we close the chapter.

20.2 Case Study: A Stock Ticker Simulation

In this example, we assume that we have a server application that is making stock prices available to the network. Any client wanting to check the value of his or her thousand shares of Gizmonic Institute can contact this server.

We’ve added a twist to this, however. We don’t just want to watch every little fluctuation in the stock price. We’ve implemented an Observer module that will let us subscribe to the stock feed; the client then watches the feed and warns us only when the price goes above or below a certain value.

First, let’s look at the DrbObservable module. This is a straightforward implementation of the Observer pattern from the excellent book Design Patterns, published by Addison-Wesley and authored by the so-called “Gang of Four” (Gamma, Helm, Johnson, and Vlissides). This is also known as the Publish-Subscribe pattern.

Listing 20.1 defines an observer as an object that responds to the update method call. Observers are added (by the server) at their own request and are sent information via the notify_observers call.

Listing 20.1 The drb Observer Module


module DrbObservable

def observer_peers
@observer_peers ||= []
end

def add_observer(observer)
unless observer.respond_to? :update
raise NameError, "observer needs to respond to 'update'"
end
observer_peers.push observer
end

def delete_observer(observer)
observer_peers.delete(observer)
end

def notify_observers(*arg)
observer_peers.dup.each do |peer|
peer.update(*arg) rescue delete_observer(i)
end if observer_peers.any?
end
end


The server (or feed) in Listing 20.2 simulates the stock price by a sequence of pseudorandom numbers. (This is as good a simulation of the market as I have ever seen, if you will pardon me for saying so.) The stock symbol identifying the company is used only for cosmetics in the simulation and has no actual purpose in the code. Every time the price changes, the observers are all notified.

Listing 20.2 The drb Stock Price Feed (Server)


require "drb"
require "drb_observer"

# Generate random prices
class MockPrice

MIN = 75
RANGE = 50

def initialize(symbol)
@price = RANGE / 2
end

def price
@price += (rand() - 0.5)*RANGE
if @price < 0
@price = -@price
elsif @price >= RANGE
@price = 2*RANGE - @price
end
MIN + @price
end
end

class Ticker # Periodically fetch a stock price
include DRbObservable

def initialize(price_feed)
@feed = price_feed
Thread.new { run }
end

def run
lastPrice = nil
loop do
price = @feed.price
print "Current price: #{price}\n"
if price != lastPrice
lastPrice = price
notify_observers(Time.now, price)
end
sleep 1
end
end
end

ticker = Ticker.new(MockPrice.new("MSFT"))

DRb.start_service('druby://localhost:9001', ticker)
puts 'Press Control-C to exit.'
DRb.thread.join


Not surprisingly, the client (in Listing 20.3) begins by contacting the server. It gets a reference to the stock ticker object and sets its own desired values for the high and low marks. Then the client will print a message for the user every time the stock price goes above the high end or below the low end.

Listing 20.3 The drb Stock Price Watcher (Client)


require "drb"

class Warner
include DRbUndumped

def initialize(ticker, limit)
@limit = limit
ticker.add_observer(self) # all warners are observers
end
end

class WarnLow < Warner
def update(time, price) # callback for observer
if price < @limit
print "—- #{time.to_s}: Price below #@limit: #{price}\n"
end
end
end

class WarnHigh < Warner
def update(time, price) # callback for observer
if price > @limit
print "+++ #{time.to_s}: Price above #@limit: #{price}\n"
end
end
end

DRb.start_service
ticker = DRbObject.new(nil, "druby://localhost:9001")

WarnLow.new(ticker, 90)
WarnHigh.new(ticker, 110)

puts "Press [return] to exit."
gets


You may wonder about the DRbUndumped module referenced in Listing 20.3. This is included in any object that is not intended to be marshalled. Basically, the mere presence of this module among the ancestors of an object is sufficient to tell drb not to marshal that object. In fact, I recommend you look at the code of this module. Here it is in its entirety:

module DRbUndumped
def _dump(dummy)
raise TypeError, "can't dump"
end
end

The stock watcher application we saw in this section is long enough to be meaningful but short enough to understand. There are other ways to approach such a problem, but this is a good solution that demonstrates the simplicity and elegance of distributed Ruby.

20.3 Rinda: A Ruby Tuplespace

The term tuplespace dates back as far as 1985, and the concept itself is even older than that. A tuple, of course, is simply an array or vector of data items (much like a database row); a tuplespace is a large object space full of tuples, like a kind of “data soup.”

So far, a tuplespace implementation sounds boring. It becomes more interesting when you realize that it is accessible in a synchronized way by multiple clients. In short, it is inherently a distributed entity; any client can read from or write to the tuplespace, so they can all use it as a large shared storage or even as a way to communicate.

The original tuplespace implementation was the Linda project, an experiment in parallel programming at Yale University in the 1980s. The Ruby implementation based on drb is naturally called Rinda.

A Rinda tuple can actually be an array or a hash. If it is a hash, it has the additional restriction that all its keys must be strings. Here are some simple tuples:

t1 = [:add, 5, 9]
t2 = [:name, :add_service, Adder.new, nil]
t3 = { 'type' => 'add', 'value_1' => 5, 'value_2' => 9 }

There can be any number of items in a tuple. Each item in a tuple can be an arbitrary object; this works because drb can marshal and unmarshal Ruby objects. (Of course, you may need to use DRbUndumped or make the class definitions available on the server side.)

We create a tuplespace with a simple new call:

require 'rinda/tuplespace'

ts = Rinda::TupleSpace.new
# ...

Therefore, a server would simply look like this:

require 'rinda/tuplespace'

ts = Rinda::TupleSpace.new
DRb.start_service("druby://somehost:9000", ts)
DRb.thread.join # Control-C to kill the server

And a client would look like this:

require 'rinda/tuplespace'

DRb.start_service
ts = DRbObject.new(nil, "druby://somehost:9000")
# ...

We can perform five basic operations on a Rinda tuplespace: read, read_all, write, take, and notify. A read operation is exactly what it sounds like: You are retrieving a tuple from tuplespace. However, identifying the tuple to read may be a little unintuitive; we do it by specifying a tuple that will match the one we want to read. A nil value is in effect a wildcard that will match any value.

t1 = ts.read [:sum, nil] # will retrieve [:sum, 14] for example

Normally a read operation will block (as a way of providing synchronization). If you want to do a quick test for the existence of a tuple, you can do a nonblocking read by specifying a timeout value of zero:

t2 = ts.read [:result, nil], 0 # raises an exception if nonexistent

If we know or expect that more than one tuple will match the pattern, we can use read_all and get an array back:

tuples = ts.read_all [:foo, nil, nil]
tuples.each do |t|
# ...
end

The read_all method doesn’t take a second parameter. It will always block if no matching tuple is found.

A take operation is basically a read followed by an implicit delete. The take actually removes a tuple from the tuplespace and returns it:

t = ts.take [:sum, nil] # tuple is now removed from tuplespace

You might ask why there isn’t an explicit method to perform a delete. Obviously, the take method will serve that purpose.

The write method, of course, stores a tuple in tuplespace. Its second parameter tells how long, in seconds, the tuple should be kept before it expires. (The default expiration value is nil, or never expiring.)

ts.write [:add, 5, 9] # Keep this "forever"
ts.write [:foo, "Bar"], 10 # Keep this ten seconds

A few words on synchronization are appropriate here. Suppose two clients attempt to take the same tuple at (approximately) the same time. One will succeed, and the other will block. If the first (successful) client then modifies the tuple and writes it back into tuplespace, the second (blocked) client will then retrieve the new modified version of the tuple. So you can think of an “update” operation as being a take followed by a write, and there will be no data loss (as long as the tuplespace only provides the tuple to one client take at a time).

A notify method, not surprisingly, enables you to “watch” the tuplespace and be informed when a matching tuple has some operation performed on it. This method (which returns a NotifyTemplateEntry object) watches for four kinds of operations:

• write operations

• take operations

• delete operations (when a tuple has expired)

• close operations (when the NotifyTemplateEntry object has expired)

Because read operations are nondestructive, the system does not support notification of reads. Listing 20.4 shows an example of using notify.

Listing 20.4 Rinda’s Notification Feature


require 'rinda/tuplespace'

ts = Rinda::TupleSpace.new

alberts = ts.notify "write", ["Albert", nil]
martins = ts.notify "take", ["Martin", nil]

Thread.new do
alberts.each {|op,t| puts "#{op}: #{t.join(' ')}" }
end

Thread.new do
martins.each {|op,t| puts "#{op}: #{t.join(' ')}" }
end

sleep 1

ts.write ["Martin", "Luther"]
ts.write ["Albert", "Einstein"]
ts.write ["Martin", "Fowler"]
ts.write ["Albert", "Schweitzer"]
ts.write ["Martin", "Scorsese"]
ts.take ["Martin", "Luther"]

# Output:
# write: Albert Einstein
# write: Albert Schweitzer
# take: Martin Luther


We’ve seen how read and other operations use templates that match tuples (conceptually much as a regular expression works). A nil value can be a wildcard, as we’ve seen, but a class can also be specified to match any instance of that class.

tem1 = ["X", Integer] # matches ["X",5] but not ["X","Files"]
tem2 = ["X", NilClass] # matches a literal nil in the tuple

In addition, you can define your own case equality (===) operator if you want a class to match a value in some special way. Otherwise, of course, the class will match based on the default === operator.

Bear in mind that the lifetime of a tuple can be specified upon writing. This ties in with the timeout values on the various tuple operations, ensuring that it’s possible to restrict any simple or complex operation to a finite length of time.

The fact that tuples can expire also means that they can be renewed after they expire, often with a custom renewer object. The library comes with a SimpleRenewer that simply contacts the tuple’s originating drb server every 180 seconds; if the server is down, the tuple is allowed to expire. However, don’t bother with renewer objects until you are competent in the tuplespace paradigm.

Rounding out our Rinda examples, Listing 20.5 shows a version of the producer/consumer example from Chapter 13 using a TupleSpace.

Listing 20.5 The Producer-Consumer Problem Revisited


require 'rinda/tuplespace'

ts = Rinda::TupleSpace.new

producer = Thread.new do
item = 0
loop do
sleep rand(0)
puts "Producer makes item ##{item}"
ts.write ["Item",item]
item += 1
end
end

consumer = Thread.new do
loop do
sleep rand(0)
tuple = ts.take ["Item", nil]
word, item = tuple
puts "Consumer retrieves item ##{item}"
end
end

sleep 60 # Run a minute, then die and kill threads


20.4 Service Discovery with Distributed Ruby

If you have many services running locally, service discovery might be a useful concept to you; it allows services to be located by name. However, if you have few services at well-known locations, this may not be particularly useful.

The Rinda library provides a service discovery facility (naturally based on Rinda) called Rinda::Ring. Think of it as providing DNS-like features; it is a central registration service storing information (in a tuplespace) about drb processes. The drb services can use UDP (datagrams) to find a nearby registration server to advertise themselves and/or to find other services in the neighborhood.

The Rinda::RingServer class implements the registration server. It keeps a tuplespace for storing locations of other drb services (although, actually, any service may be advertised). When it comes up, the RingServer listens for UDP broadcast packets requesting the location of the server. It responds to each of these by connecting back (via drb) to the requesting service. The following is an example of running a RingServer:

require 'rinda/ring'
require 'rinda/tuplespace'

DRb.start_service
Rinda::RingServer.new(Rinda::TupleSpace.new)
DRb.thread.join

The Rinda::RingProvider class is used to register (or advertise) a service with a RingServer. A service is identified by a service type, a front-end object providing the service, and a piece of descriptive text. In the following example, we create a simple Adder service that adds two numbers; then we advertise it to the world:

require 'rinda/ring'

class Adder
include DRbUndumped

def add(val1, val2)
return val1 + val2
end
end

adder = Adder.new
DRb.start_service(nil, adder)
Rinda::RingProvider.new(:adder, adder, 'Simple Adder')
DRb.thread.join

The Rinda::RingFinger class (presumably named for the classic UNIX finger command) can be used to locate a RingServer. It sends a UDP broadcast packet and waits for the RingServer to respond. The tuplespace of the first RingServer to respond becomes available via the primary method. Any other discovered servers are available via the to_a method. The returned tuplespace can then be used to look up advertised services:

require 'rinda/ring'

DRb.start_service
rs = Rinda::RingFinger.primary
every_space = [rs] + Rinda::RingFinger.to_a
svc = every_space.find_all do |ts|
ts.read([:name, :adder, nil, nil], 0) rescue nil
end

20.5 Conclusion

This chapter presented a good introduction to distributed Ruby. We saw the basics of starting a service and communicating with clients, and we looked at security issues.

In addition, we saw how Rinda can act as a simple object store that is both distributed and synchronized. Finally, we looked at how Rinda::Ring can be used for drb service discovery.

That ends our look at distributed Ruby. Let’s move on to a new topic: using development tools associated with Ruby, such as rake, irb, and others.