Concurrency - High Performance Python (2014)

High Performance Python (2014)

Chapter 8. Concurrency

QUESTIONS YOU’LL BE ABLE TO ANSWER AFTER THIS CHAPTER

§ What is concurrency and how is it helpful?

§ What is the difference between concurrency and parallelism?

§ Which tasks can be done concurrently and which can’t?

§ What are the various paradigms for concurrency?

§ When is the right time to take advantage of concurrency?

§ How can concurrency speed up my programs?

I/O can be quite burdensome to the flow of a program. Every time your code reads from a file or writes to a network socket, it must pause to contact the kernel, request that the operation happen, and then wait for it to complete. This may not seem like the end of the world, especially after you realize that a similar operation happens every time memory is allocated; however, if we look back to Figure 1-3 we see that most of the I/O operations we perform are on devices that are orders of magnitude slower than the CPU.

For example, in the time it takes to write to a network socket, an operation that typically takes about 1 ms, we could have completed 2,400,000 instructions on a 2.4 GHz computer. Worst of all, our program is halted for much of this 1 ms time—our execution is paused and we are waiting for a signal that the write operation has completed. This time spent in a paused state is called “I/O wait.”

Concurrency helps us utilize this wasted time by allowing us to perform other operations while waiting for an I/O operation to complete. For example, in Figure 8-1 we see a depiction of a program that must run three tasks, all of which have periods of I/O wait within them. If we run them serially, then we suffer the I/O wait penalty three times. However, if we run these tasks concurrently we can essentially hide the wait time by running another task in the meantime. It is important to note that this is all still happening on a single thread and still only uses one CPU at a time!

While concurrency isn’t limited to I/O, this is where we see the greatest benefits. In a concurrent program, instead of having your code run serially—that is, from one line to the next—your code is written to handle events, and different parts of your code run when different events happen.

By modeling a program in this way, we are able to deal with the particular event that we are concerned with: I/O wait.

Comparison between serial and concurrent programs

Figure 8-1. Comparison between serial and concurrent programs

Introduction to Asynchronous Programming

When a program enters I/O wait, the execution is paused so that the kernel can perform the low-level operations associated with the I/O request (this is called a context switch) and is not resumed until the I/O operation is completed. Context switching is quite a heavy operation. It requires us to save the state of our program (losing any sort of caching we had at the CPU level) and give up the use of the CPU. Later, when we are allowed to run again, we must spend time reinitializing our program on the motherboard and getting ready to resume (of course, all this happens behind the scenes).

With concurrency, on the other hand, we typically have a thing called an “event loop” running that manages what gets to run in our program, and when. In essence, an event loop is simply a list of functions that need to be run. The function at the top of the list gets run, then the next, etc.Example 8-1 shows a simple example of an event loop.

Example 8-1. Toy event loop

from Queue import Queue

from functools import partial

eventloop = None

class EventLoop(Queue):

def start(self):

while True:

function = self.get()

function()

def do_hello():

global eventloop

print "Hello"

eventloop.put(do_world)

def do_world():

global eventloop

print "world"

eventloop.put(do_hello)

if __name__ == "__main__":

eventloop = EventLoop()

eventloop.put(do_hello)

eventloop.start()

This may not seem like a big change; however, we can couple event loops with asynchronous (async) I/O operations for massive gains when performing I/O tasks. These operations are nonblocking, meaning if we do a network write with an async function, it will return right away even though the write has not happened yet. When the write has completed, an event fires so our program knows about it.

Putting these two concepts together, we can have a program that, when an I/O operation is requested, runs other functions while waiting for the original I/O operation to complete. This essentially allows us to still do meaningful calculations while we would otherwise have been in I/O wait.

NOTE

Switching from function to function does have a cost. The kernel must take the time to set up the function to be called in memory, and the state of our caches won’t be as predictable. It is because of this that concurrency gives the best results when your program has a lot of I/O wait—while this switching does have a cost to it, it can be much less than what is gained by making use of I/O wait time.

Programming using event loops can take two forms: callbacks or futures. In the callback paradigm, functions are called with an argument that is generally called the callback. Instead of the function returning its value, it calls the callback function with the value instead. This sets up long chains of functions that are called, with each getting the result of the previous function in the chain. Example 8-2 is a simple example of the callback paradigm.

Example 8-2. Example with callbacks

from functools import partial

def save_value(value, callback):

print "Saving {} to database".format(value)

save_result_to_db(result, callback) # 1

def print_response(db_response):

print "Response from database: {}".format(db_response)

if __name__ == "__main__":

eventloop.put(

partial(save_value, "Hello World", print_response)

)

1

save_result_to_db is an asynchronous function; it will return immediately and the function will end and allow other code to run. However, once the data is ready, print_response will be called.

With futures, on the other hand, an asynchronous function returns a promise of a future result instead of the actual result. Because of this, we must wait for the future that is returned by this sort of asynchronous function to complete and be filled with the value we desire (either by doing ayield on it or by running a function that explicitly waits for a value to be ready). While waiting for the future object to be filled with the data we requested, we can do other calculations. If we couple this with the concept of generators—functions that can be paused and whose execution can later be resumed—we can write asynchronous code that looks very close to serial code in form:

@coroutine

def save_value(value, callback):

print "Saving {} to database".format(value)

db_response = yield save_result_to_db(result, callback) # 1

print "Response from database: {}".format(db_response)

if __name__ == "__main__":

eventloop.put(

partial(save_value, "Hello World")

)

1

In this case, save_result_to_db returns a Future type. By yielding from it, we ensure that save_value gets paused until the value is ready, then resumes and completes its operations.

In Python, coroutines are implemented as generators. This is convenient because generators already have the machinery to pause their execution and resume later. So, what happens is our coroutine will yield a future and the event loop will wait until that future has its value ready. Once this happens, the event loop will resume execution of that function, sending back to it the value of the future.

For Python 2.7 implementations of future-based concurrency, things can get a bit strange when we’re trying to use coroutines as actual functions. Remember that generators cannot return values, so there are various ways libraries deal with this issue. In Python 3.4, however, new machinery has been introduced in order to easily create coroutines and have them still return values.

In this chapter we will analyze a web crawler that fetches data from an HTTP server that has latency built into it. This represents the general response time latency that will occur whenever we’re dealing with I/O. We will first create a serial crawler that looks like the naive Python solution to this problem. Then we will go through two solutions in Python 2.7: gevent and tornado. Finally, we will look at the asyncio library in Python 3.4 and see what the future of asynchronous programming in Python looks like.

NOTE

The web server we implemented can support multiple connections at a time. This will be true for most services that you will be performing I/O with—most databases can support multiple requests at a time, and most web servers support 10K+ simultaneous connections. However, when interacting with a service that cannot handle multiple connections at a time,[18] we will always have the same performance as the serial case.

Serial Crawler

For the control in our experiment with concurrency we will write a serial web scraper that takes a list of URLs, fetches them, and sums the total length of the content from the pages. We will use a custom HTTP server that takes two parameters, name and delay. The delay field will tell the server how long, in milliseconds, to pause before responding. The name field is simply for logging purposes.

By controlling the delay parameter, we can simulate the time it takes a server to respond to our query. In the real world, this could correspond to a slow web server, a strenuous database call, or any I/O call that takes a long time to perform. For the serial case, this simply represents more time that our program will be stuck in I/O wait, but in the concurrent examples later it will represent more time the program can spend doing other things.

In addition, we chose to use the requests module to perform the HTTP call. This choice is because of the simplicity of the module. We use HTTP in general for this section because it is a simple example of I/O and HTTP requests can be performed quite easily. In general, any call to an HTTP library can be replaced with any I/O. The serial version of our HTTP crawler is shown in Example 8-3.

Example 8-3. Serial HTTP scraper

import requests

import string

import random

def generate_urls(base_url, num_urls):

"""

We add random characters to the end of the URL to break any caching

mechanisms in the requests library or the server

"""

for i inxrange(num_urls):

yield base_url + "".join(random.sample(string.ascii_lowercase, 10))

def run_experiment(base_url, num_iter=500):

response_size = 0

for url ingenerate_urls(base_url, num_iter):

response = requests.get(url)

response_size += len(response.text)

return response_size

if __name__ == "__main__":

import time

delay = 100

num_iter = 500

base_url = "http://127.0.0.1:8080/add?name=serial&delay={}&".format(delay)

start = time.time()

result = run_experiment(base_url, num_iter)

end = time.time()

print("Result: {}, Time: {}".format(result, end - start))

When running this code, an interesting metric to look at is the start and stop time of each request as seen by the HTTP server. This tells us how efficient our code was during I/O wait—since our task is simply to launch HTTP requests and then sum the number of characters that were returned, we should be able to launch more HTTP requests, and process any responses, while waiting for other requests to complete.

We can see from Figure 8-2 that, as expected, there is no interleaving of our requests. We do one request at a time and wait for the previous request to complete before we move to the next request. In fact, the total runtime of the serial process makes perfect sense, knowing this: since each request takes 0.1s (because of our delay parameter) and we are doing 500 requests, we expect the total runtime to be about 50 seconds.

Request times for serial scraper

Figure 8-2. Chronology of HTTP requests for Example 8-3

gevent

One of the simplest asynchronous libraries is gevent. It follows the paradigm of having asynchronous functions return futures, which means most of the logic in your code can stay the same. In addition, gevent monkey-patches the standard I/O functions to be asynchronous, so most of the time you can simply use the standard I/O packages and benefit from asynchronous behavior.

gevent provides two mechanisms to enable asynchronous programming—as we’ve just mentioned, it patches the standard library with asynchronous I/O functions, and it also has a Greenlet object that can be used for concurrent execution. A greenlet is a type of coroutine and can be thought of as a thread (see Chapter 9 for a discussion of threads); however, all greenlets run on the same physical thread. That is, instead of using multiple CPUs to run all the greenlets, gevent’s scheduler switches between them during I/O wait by use of an event loop. For the most part,gevent tries to make the handling of the event loop as transparent as possible through the use of wait functions. The wait function will start an event loop and run it as long as needed, until all the greenlets have finished. Because of this, most of your gevent code will run serially; then, at some point you will set up many greenlets to do some concurrent task, and start the event loop with the wait function. While the wait function is executing, all of the concurrent tasks you have queued up will run until completion (or some stopping condition), and then your code will go back to being serial again.

The futures are created with gevent.spawn, which takes a function and the arguments to that function and launches a greenlet that is responsible for running that function. The greenlet can be thought of as a future, since once the function you’ve specified completes, its value will be contained within the greenlet’s value field.

This patching of Python standard modules can make it harder to control the subtleties of which asynchronous functions get run, and when. For example, one thing we want to make sure of when doing async I/O is that we don’t open too many files or connections at one time. If we do this, we can overload the remote server or slow down our process by having to context-switch between too many operations. It wouldn’t be as efficient to launch as many greenlets as we have URLs to fetch; we need a mechanism to limit how many HTTP requests we make at a time.

We can control the number of concurrent requests manually by using a semaphore to only do HTTP gets from 100 greenlets at a time. A semaphore works by making sure that only a certain number of coroutines can enter the context block at a time. As a result, we can launch all the greenlets that we need in order to fetch the URLs right away, but only 100 of them will be able to make HTTP calls at a time. Semaphores are one type of locking mechanism used a lot in various parallel code flows. By restricting the progression of your code based on various rules, locks can help you make sure that the various components of your program don’t interfere with each other.

Now that we have all the futures set up and have put in a locking mechanism to control the flow of the greenlets, we can wait until we start having results by using the gevent.iwait function, which will take a sequence of futures and iterate over the ready items. Conversely, we could have used gevent.wait, which would block execution of our program until all requests are done.

We go through the trouble of chunking our requests instead of sending them all at once because overloading the event loop can cause performance decreases (and this is true for all asynchronous programming). From experimentation, we generally see that 100 or so open connections at a time is optimal (see Figure 8-3). If we were to use less, we would still have wasted time during I/O wait. With more, we are switching contexts too often in the event loop and adding unnecessary overhead to our program. That being said, this value of 100 depends on many things—the computer the code is being run on, the implementation of the event loop, the properties of the remote host, the expected response time of the remote server, etc. We recommend doing some experimentation before settling on a choice. Example 8-4 shows the code for the gevent version of our HTTP crawler.

Experimenting with different numbers of concurrent requests for various request times

Figure 8-3. Finding the right number of concurrent requests

Example 8-4. gevent HTTP scraper

from gevent import monkey

monkey.patch_socket()

import gevent

from gevent.coros import Semaphore

import urllib2

import string

import random

def generate_urls(base_url, num_urls):

for i inxrange(num_urls):

yield base_url + "".join(random.sample(string.ascii_lowercase, 10))

def chunked_requests(urls, chunk_size=100):

semaphore = Semaphore(chunk_size) # 1

requests = [gevent.spawn(download, u, semaphore) for u inurls] # 2

for response ingevent.iwait(requests):

yield response

def download(url, semaphore):

with semaphore: # 3

data = urllib2.urlopen(url)

return data.read()

def run_experiment(base_url, num_iter=500):

urls = generate_urls(base_url, num_iter)

response_futures = chunked_requests(urls, 100) # 4

response_size = sum(len(r.value) for r inresponse_futures)

return response_size

if __name__ == "__main__":

import time

delay = 100

num_iter = 500

base_url = "http://127.0.0.1:8080/add?name=gevent&delay={}&".format(delay)

start = time.time()

result = run_experiment(base_url, num_iter)

end = time.time()

print("Result: {}, Time: {}".format(result, end - start))

1

Here we generate a semaphore that lets chunk_size downloads happen.

3

By using the semaphore as a context manager, we ensure that only chunk_size greenlets can run the body of the context at a time.

2

We can queue up as many greenlets as we need, knowing that none of them will run until we start an event loop with wait or iwait.

4

response_futures now holds an iterator of completed futures, all of which have our desired data in the .value property.

Alternatively, we can use grequests to greatly simplify our gevent code. While gevent provides all sorts of lower-level concurrent socket operations, grequests is a combination of the requests HTTP library and gevent; the result is a very simple API for making concurrent HTTP requests (it even handles the semaphore logic for us). With grequests, our code becomes a lot simpler, more understandable, and more maintainable, while still resulting in comparable speedups to the lower-level gevent code (see Example 8-5).

Example 8-5. grequests HTTP scraper

import grequests

def run_experiment(base_url, num_iter=500):

urls = generate_urls(base_url, num_iter)

response_futures = (grequests.get(u) for u inurls) # 1

responses = grequests.imap(response_futures, size = 100) # 2

response_size = sum(len(r.text) for r inresponses)

return response_size

1

First we create the requests and get futures back. We chose to do this as a generator so that later we only need to evaluate as many requests as we are ready to issue.

2

Now we can take the future objects and map them into real response objects. The .imap function gives us a generator that yields response objects for which we have retrieved data.

An important thing to note is that we have used gevent and grequests to make our I/O requests asynchronous, but we are not doing any non-I/O computations while in I/O wait. Figure 8-4 shows the massive speedup we get. By launching more requests while waiting for previous requests to finish, we are able to achieve a 69x speed increase! We can explicitly see how new requests are being sent out before previous requests finish by how the horizontal lines representing the requests stack on each other. This is in sharp contrast to the case of the serial crawler (Figure 8-2), where a line only starts when the previous line finishes. Furthermore, we can see more interesting effects going on with the shape of the gevent request timeline. For example, at around the first 100th request, we see a pause where new requests are not launched. This is because it is the first time that our semaphore is hit, and we are able to lock the semaphore before any previous requests finish. After this, the semaphore goes into an equilibrium where it locks just as another request finishes and unlocks it.

Request times for gevent scraper

Figure 8-4. Chronology of HTTP requests for Example 8-5

tornado

Another very frequently used package for asynchronous I/O in Python is tornado, a package developed by Facebook primarily for HTTP clients and servers. In contrast to gevent, tornado chooses to use the callback method for async behavior. However, in the 3.x release coroutine-like behavior was added in a way that is compatible with old code.

In Example 8-6, we implement the same web crawler as we did for gevent, but using the tornado I/O loop (its version of an event loop) and HTTP client. This saves us the trouble of having to batch our requests and deal with other, more low-level aspects of our code.

Example 8-6. tornado HTTP scraper

from tornado import ioloop

from tornado.httpclient import AsyncHTTPClient

from tornado import gen

from functools import partial

import string

import random

AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient",

max_clients=100) # 1

def generate_urls(base_url, num_urls):

for i inxrange(num_urls):

yield base_url + "".join(random.sample(string.ascii_lowercase, 10))

@gen.coroutine

def run_experiment(base_url, num_iter=500):

http_client = AsyncHTTPClient()

urls = generate_urls(base_url, num_iter)

responses = yield [http_client.fetch(url) for url inurls] # 2

response_sum = sum(len(r.body) for r inresponses)

raise gen.Return(value=response_sum) # 3

if __name__ == "__main__":

#... initialization ...

_ioloop = ioloop.IOLoop.instance()

run_func = partial(run_experiment, base_url, num_iter)

result = _ioloop.run_sync(run_func) # 4

1

We can configure our HTTP client and pick what backend library we wish to use and how many requests we would like to batch together.

2

We generate many futures and then yield back to the I/O loop. This function will resume, and the responses variable will be filled with all of the futures, results when they are ready.

3

Coroutines in tornado are backed by Python generators. In order to return a value from them, we must raise a special exception that gen.coroutine turns into a return value.

4

ioloop.run_sync will start the IOLoop just for the duration of the runtime of the specified function. ioloop.start(), on the other hand, starts an IOLoop that must be terminated manually.

An important difference between the tornado code from Example 8-6 and the gevent code from Example 8-4 is when the event loop runs. For gevent, the event loop is only running while the iwait function is running. On the other hand, in tornado the event loop is running the entire time and controls the complete execution flow of the program, not just the asynchronous I/O parts.

This makes tornado ideal for applications that are mostly I/O-bound and where most, if not all, of the application should be asynchronous. This is where tornado makes its biggest claim to fame, as a performant web server. In fact, Micha has on many occasions written tornado-backed databases and data structures that require a lot of I/O.[19] On the other hand, since gevent makes no requirements of your program as a whole, it is an ideal solution for a mainly CPU-based problems that sometimes involve heavy I/O—for example, a program that does a lot of computations over a dataset and then must send the results back to the database for storage. This becomes even simpler with the fact that most databases have simple HTTP APIs, which means you can even use grequests.

We can see just how much control the tornado event loop has if we look at the older-style tornado code that utilizes callbacks in Example 8-7. We can see that in order to start the code we must add the entry point for our program into the I/O loop, and then start it. Then, in order for the program to terminate, we must carefully carry around the stop function for our I/O loop and call it when appropriate. As a result, programs that must explicitly carry callbacks become incredibly burdensome and quickly unmaintainable. One reason this happens is that tracebacks can no longer hold valuable information about what function called what and how we got into an exception to begin with. Even simply knowing which functions are called at all can become hard, since we are constantly making partial functions to fill in parameters. It is no surprise that this is often called “callback hell.”

Example 8-7. tornado crawler with callbacks

from tornado import ioloop

from tornado.httpclient import AsyncHTTPClient

from functools import partial

AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient",

max_clients=100)

def fetch_urls(urls, callback):

http_client = AsyncHTTPClient()

urls = list(urls)

responses = []

def _finish_fetch_urls(result): # 1

responses.append(result)

if len(responses) == len(urls):

callback(responses)

for url inurls:

http_client.fetch(url, callback=_finish_fetch_urls)

def run_experiment(base_url, num_iter=500, callback=None):

urls = generate_urls(base_url, num_iter)

callback_passthrou = partial(_finish_run_experiment,

callback=callback) # 2

fetch_urls(urls, callback_passthrou)

def _finish_run_experiment(responses, callback):

response_sum = sum(len(r.body) for r inresponses)

print response_sum

callback()

if __name__ == "__main__":

# ... initialization ...

_ioloop = ioloop.IOLoop.instance()

_ioloop.add_callback(run_experiment, base_url, num_iter, _ioloop.stop) # 3

_ioloop.start()

3

We send _ioloop.stop as the callback to run_experiment so that once the experiment is done, it shuts off the I/O loop for us.

2

Callback-type async code involves a lot of partial function creation. This is because we often need to preserve the original callback we were sent, even though we currently need to transfer the runtime to another function.

1

Sometimes games with scope are a necessary evil, in order to preserve state while not cluttering the global namespace.

Another interesting difference between gevent and tornado is the way the internals change the request call graphs. Compare Figure 8-5 with Figure 8-4. For the gevent call graph, we see some areas where the diagonal line seems to get thinner, and others where it seems to get much thicker. The thinner regions show times when we are waiting for old requests to finish before launching new ones. The thicker regions represent areas where we are too busy to read the responses from requests that should have already finished. Both these types of regions represent times when the event loop isn’t doing its job optimally: times when we are either underutilizing or overutilizing our resources.

On the other hand, the call graph for tornado is much more uniform. This shows that ‘tornado` is better able to optimize our resource use. This can be attributed to many things. A contributing factor here is that because the semaphore logic limiting the number of concurrent requests to 100 is internal to tornado, it can better allocate resources. This includes preallocating and reusing connections in a smarter way. In addition, there are many smaller effects resulting from the modules’ choices with regard to their communications with the kernel in order to coordinate receiving results from asynchronous operations.

Request times for tornado scraper

Figure 8-5. Chronology of HTTP requests for Example 8-6

AsyncIO

In response to the popularity of using async functionality to deal with heavy-I/O systems, Python 3.4+ introduced a revamping of the old asyncio standard library module. This module draws much of its influence from the gevent and tornado method of concurrency, where coroutines are defined and yielded from in order to halt the execution of the current function and allow other coroutines to run. As in tornado, the event loop is explicitly started in order to start the execution of the coroutines. In addition, Python 3 introduced a new keyword, yield from, that greatly simplifies dealing with these coroutines (we no longer have to raise an exception to return a value from a coroutine, as we did in Example 8-6).

It is important to note that the asyncio library is very low-level and does not provide much higher-level functionality to the user. For example, while there is a very full socket API, there is no easy way to do HTTP requests. As a result, we chose to use the aiohttp library in Example 8-8. However, the adoption of the asyncio library is just starting to ramp up, and the landscape of helper modules is probably going to be changing very quickly.

Example 8-8. asyncio HTTP scraper

import asyncio

import aiohttp

import random

import string

def generate_urls(base_url, num_urls):

for i inrange(num_urls):

yield base_url + "".join(random.sample(string.ascii_lowercase, 10))

def chunked_http_client(num_chunks):

semaphore = asyncio.Semaphore(num_chunks) # 1

@asyncio.coroutine

def http_get(url): # 2

nonlocal semaphore

with (yield from semaphore):

response = yield from aiohttp.request('GET', url)

body = yield from response.content.read()

yield from response.wait_for_close()

return body

return http_get

def run_experiment(base_url, num_iter=500):

urls = generate_urls(base_url, num_iter)

http_client = chunked_http_client(100)

tasks = [http_client(url) for url inurls] # 3

responses_sum = 0

for future inasyncio.as_completed(tasks): # 4

data = yield from future

responses_sum += len(data)

return responses_sum

if __name__ == "__main__":

import time

delay = 100

num_iter = 500

base_url = "http://127.0.0.1:8080/add?name=asyncio&delay={}&".format(delay)

loop = asyncio.get_event_loop()

start = time.time()

result = loop.run_until_complete(run_experiment(base_url, num_iter))

end = time.time()

print("{} {}".format(result, end-start))

1

As in the gevent example, we must use a semaphore to limit the number of requests.

2

We return a new coroutine that will asynchronously download files and respect the locking of the semaphore.

3

The http_client function returns futures. To keep track of progress, we save the futures into a list.

4

As with gevent, we can wait for futures to become ready and iterate over them.

One of the fantastic benefits of the asyncio module is its familiar API compared to the standard library, which simplifies making helper modules. We are able to get the same sort of results as we would with tornado or gevent, but if we wanted to, we could dive deeper into the stack and make our own async protocols using a wide array of supported structures. In addition, because it is a standard library module, we are assured that this module will always be PEP-compliant and reasonably maintained.[20]

Furthermore, the asyncio library allows us to unify modules like tornado and gevent by having them run in the same event loop. In fact, the Python 3.4 version of tornado is backed by the asyncio library. As a result, even though tornado and gevent have different use cases, the underlying event loop will be unified, which will make changing from one paradigm to the other mid-code trivial. You can even make your own wrappers on top of the asyncio module quite easily, in order to interact with asynchronous operations in the most efficient way possible for the problem you are solving.

Although it’s only supported in Python 3.4 and higher,[21] this module at the very least is a great sign of more work being put into asynchronous I/O in the future. As Python starts dominating more and more processing pipelines (from data processing to web request processing), this shift makes perfect sense.

Figure 8-6 shows the request timeline for the asyncio version of our HTTP scraper.

Request times for AsyncIO scraper

Figure 8-6. Chronology of HTTP requests for Example 8-8

Database Example

To make the preceding examples more concrete, we will create another toy problem that is mostly CPU-bound but contains a potentially limiting I/O component. We will be calculating primes and saving the found primes into a database. The database could be anything, and the problem is representative of any sort of problem where your program has heavy calculations to do, and the results of those calculations must be stored into a database, potentially incurring a heavy I/O penalty. The only restrictions we are putting on our database are:

§ It has an HTTP API so that we can use code like that in the earlier examples.[22]

§ Response times are on the order of 50 ms.

§ The database can satisfy many requests at a time.[23]

We start with some simple code that calculates primes and makes a request to the database’s HTTP API every time a prime is found:

from tornado.httpclient import HTTPClient

import math

httpclient = HTTPClient()

def save_prime_serial(prime):

url = "http://127.0.0.1:8080/add?prime={}".format(prime)

response = httpclient.fetch(url)

finish_save_prime(response, prime)

def finish_save_prime(response, prime):

if response.code != 200:

print "Error saving prime: {}".format(prime)

def check_prime(number):

if number % 2 == 0:

return False

for i inxrange(3, int(math.sqrt(number)) + 1, 2):

if number % i == 0:

return False

return True

def calculate_primes_serial(max_number):

for number inxrange(max_number):

if check_prime(number):

save_prime_serial(number)

return

Just as in our serial example (Example 8-3), the request times for each database save (50 ms) do not stack, and we must pay this penalty for each prime we find. As a result, searching up to max_number = 8,192 (which results in 1,028 primes) takes 55.2s. We know, however, that because of the way our serial requests work, we are spending 51.4s at minimum doing I/O! So, simply because we are pausing our program while doing I/O, we are wasting 93% of our time.

What we want to do instead is to find a way to change our request scheme so that we can issue many requests asynchronously at a time, so that we don’t have such a burdensome I/O wait. In order to do this, we create an AsyncBatcher class that takes care of batching requests for us and making the requests when necessary:

import grequests

from itertools import izip

class AsyncBatcher(object):

__slots__ = ["batch", "batch_size", "save", "flush"]

def __init__(self, batch_size):

self.batch_size = batch_size

self.batch = []

def save(self, prime):

url = "http://127.0.0.1:8080/add?prime={}".format(prime)

self.batch.append((url,prime))

if len(self.batch) == self.batch_size:

self.flush()

def flush(self):

responses_futures = (grequests.get(url) for url, _ inself.batch)

responses = grequests.map(responses_futures)

for response, (url, prime) inizip(responses, self.batch):

finish_save_prime(response, prime)

self.batch = []

Now, we can proceed almost in the same way as we did before. The only main difference is that we add our new primes to our AsyncBatcher and let it take care of when to send the requests. In addition, since we are batching we must make sure to send the last batch even if it is not full (which means making a call to AsyncBatcher.flush()):

def calculate_primes_async(max_number):

batcher = AsyncBatcher(100) # 1

for number inxrange(max_number):

if check_prime(number):

batcher.save(number)

batcher.flush()

return

1

We choose to batch at 100 requests, for similar reasons to those illustrated in Figure 8-3.

With this change, we are able to bring our runtime for max_number = 8,192 down to 4.09s. This represents a 13.5x speedup without our having to do much work. In a constrained environment such as a real-time data pipeline, this extra speed could mean the difference between a system being able to keep up with demand and falling behind (in which case a queue will be required; you’ll learn about these in Chapter 10).

In Figure 8-7 we can see a summary of how these changes affect the runtime of our code for different workloads. The speedup in the async code over the serial code is significant, although we are still a ways away from the speeds achieved in the raw CPU problem. For this to be completely remedied, we would need to use modules like multiprocessing to have a completely separate process that can deal with the I/O burden of our program without slowing down the CPU portion of the problem.

Processing time difference between serial I/O, async I/O and I/O disabled

Figure 8-7. Processing times for different numbers of primes

Wrap-Up

When solving problems in real-world and production systems, it is often necessary to communicate with some outside source. This outside source could be a database running on another server, another worker computer, or a data service that is providing the raw data that must be processed. Whenever this is the case, your problem can quickly become I/O-bound, meaning that most of the runtime is dominated by dealing with input/output.

Concurrency helps with I/O-bound problems by allowing you to interleave computation with potentially multiple I/O operations. This allows you to exploit the fundamental difference between I/O and CPU operations in order to speed up overall runtime.

As we saw, gevent provides the highest-level interface for asynchronous I/O. On the other hand, tornado lets you manually control how the event loop is running, allowing you to use the event loop to schedule any sort of task you want. Finally, asyncio in Python 3.4+ allows full control of an asynchronous I/O stack. In addition to the various levels of abstraction, every library uses a different paradigm for its syntax (the differences stem mainly from the lack of native support for concurrency before Python 3 and the introduction of the yield from statement). We recommend gaining some experience in a range of methods and picking one based on how much low-level control is necessary.

Finally, there are small speed differences between the three libraries we approached. Many of these speed differences are based on how coroutines are scheduled. For example, tornado does an incredible job of launching asynchronous operations and resuming the coroutine quickly. On the other hand, even though asyncio seems to perform slightly worse, it allows much lower-level access into the API and can be tuned dramatically.

In the next chapter, we will take this concept of interleaving computation from I/O-bound problems and apply it to CPU-bound problems. With this new ability, we will be able to perform not only multiple I/O operations at once, but also many computational operations. This capability will allow us to start to make fully scalable programs where we can achieve more speed by simply adding more computer resources that can each handle a chunk of the problem.


[18] With some databases, such as Redis, this is a design choice made specifically to maintain data consistency.

[19] For example, fuggetaboutit is a special type of probabilistic data structure (see Probabilistic Data Structures) that uses the tornado IOLoop to schedule time-based tasks.

[20] Python Enhancement Proposals (PEPs) are how the Python community decides on changes and advances the language. Because it’s part of the standard library, asyncio will always comply with the newest PEP standards for the language and take advantage of any new features.

[21] Most performance applications and modules are still in the Python 2.7 ecosystem.

[22] This is not necessary; it just serves to simplify our code.

[23] This is true for all distributed databases and other popular databases, such as Postgres, MongoDB, Riak, etc.