Task Queues and Scheduled Tasks - Programming Google App Engine (2012)

Programming Google App Engine

Chapter 16. Task Queues and Scheduled Tasks

The App Engine architecture is well suited for handling web requests, small amounts of work that run in a stateless environment with the intent of returning a response to the user as fast as possible. But many web applications have other kinds of work that need to get done, work that doesn’t fit in the fast response model. Instead of doing the work while the user waits, it’s often acceptable to record what work needs to get done, respond to the user right away, then do the work later, within seconds or minutes. The ability to make this trade-off is especially useful with scalable web applications that use a read-optimized datastore, since updating an element of data may require several related but time-consuming updates, and it may not be essential for those updates to happen right away.

What we need is a way to do work outside of a user-facing request handler. By “outside,” we mean code that is run separately from the code that evaluates a request from a user and returns a response to the user. This work can run in parallel to the user-facing request handler, or after the request handler has returned a response, or completely independently of user requests. We also need a way to request that this work be done.

App Engine has two major mechanisms for initiating this kind of work: task queues and scheduled tasks. A task is simply a request that a unit of work be performed separately from the code requesting the task. Any application code can call the task queue service to request a task, and the task queue manages the process of driving the task to completion. Scheduled tasks are tasks that are invoked on a schedule that you define in a configuration file, separately from any application code or queue (although a scheduled task is welcome to add other tasks to a queue, as is any other task running on App Engine).

In the terminology of task queues, a producer is a process that requests that work be done. The producer enqueues a task that represents the work onto a queue. A consumer is a process, separate from the producer, that leases tasks on the queue that it intends to perform. If the consumer performs a task successfully, it deletes the task from the queue so no other consumer tries to perform it. If the consumer fails to delete the task, the queue assumes the task was not completed successfully, and after an amount of time, the lease expires and becomes available again to other consumers. A consumer may also explicitly revoke the lease if it can’t perform the task.

Task queues are a general category for two mechanisms for driving work to completion: push queues and pull queues. With push queues, App Engine is the consumer: it executes tasks on the queues at configurable rates, and retries tasks that return a failure code. With pull queues, you provide the consumer mechanism that leases task records off of a queue, does the work they represent, and then deletes them from the queue. Your custom mechanism can run on App Engine, or it can run on your own infrastructure and pull tasks, using a REST API.

To perform a task on a push queue, App Engine does what it does best: it invokes a request handler! You can configure a URL path in your app per queue, or specify a specific URL path when you enqueue a task. To implement the task, you simply implement a request handler for requests sent to that URL. Naturally, you can secure these URLs against outside requests, so only tasks can trigger that logic. This is why we’ve been making a distinction between “user-facing” request handlers and other handlers. All code on App Engine runs in a request handler, and typically code for handling user requests is distinct from code for performing tasks.

Scheduled tasks also run on App Engine, and also use request handlers. You configure a schedule of URL paths and times, and App Engine calls your application at those URL paths at the requested times. Scheduled tasks are not retried to completion, but you can achieve this effect by having a scheduled task enqueue a task on a queue.

All of these mechanisms support a major design goal for App Engine applications: do as much work outside of user-facing requests as possible, so user-facing requests are as fast as possible. Task queues allow your code to request that work be done separately from the current unit of work. Scheduled tasks initiate computation on a predefined schedule, independently of other code. The results of this work can be stored in the datastore, memcache, and Blobstore, so user-facing request handlers can retrieve and serve it quickly, without doing the work itself.

Enqueueing a task is fast, about three times faster than writing to the datastore. This makes tasks useful for pretty much anything whose success or failure doesn’t need to be reported to the user in the response to the user request that initiates the work. For example, an app can write a value to the memcache, then enqueue a task to persist that value to the datastore. This saves time during the user request, and allows the task to do bookkeeping or make other time-consuming updates based on the change (assuming it meets the application’s needs that the bookkeeping happens later than the initial update).

App Engine invokes a request handler for a push queue task or scheduled task in the same environment as it does a handler for a user request, with a few minor differences. Most notably, a task handler can run continuously for up to 10 minutes, instead of 60 seconds for user-facing request handlers. In some cases, it can be better to implement task handlers with short running times, then split a batch of work over multiple tasks, so the tasks can be executed in parallel in multiple threads or on multiple instances. But 10 minutes of head room let you simplify your code for work that can or must take its time on an instance.

Figure 16-1 illustrates how task queues and scheduled tasks take advantage of the request handler infrastructure.

Architecture of push queues and scheduled tasks (“cron”)

Figure 16-1. Architecture of push queues and scheduled tasks (“cron”)

The development server maintains push and pull queues, and can run push queues in the background and simulate their timing and throttling behaviors. Of course, it won’t be as fast as running on App Engine, but it’s enough to test the behavior. You can use the development console web interface to inspect the configured queues, check their contents, and delete individual tasks or all tasks on a queue (“Purge Queue”). With push queues, you can also force a task to run from the console.

Task queues are an important part of App Engine, with several powerful features for structuring your work and optimizing your application’s behavior. Not the least of these features is how task queues integrate with the App Engine datastore transactions. In this chapter, we describe the concepts of task queues and scheduled tasks, and how to use them in Python and Java applications. We take a brief look at pull queues and consider how they are useful. We cover using tasks and datastore transactions to achieve special effects, especially eventually consistent data operations and task chaining. And finally, we review the queue-related features of the Administrator Console.

Configuring Task Queues

Every app has one default push queue with default settings. You can use a configuration file to change the settings of the default push queue, create new named push and pull queues each with their own settings, and set global parameters for task queues in general.

In Python, you create a file named queue.yaml. This is a YAML file (same as app.yaml). Here is an example file that updates the rate of the default queue, and defines a new named push queue with its own configuration:

queue:

- name: default

rate: 10/s

- name: attack_effects

rate: 100/s

bucket_size: 20

In Java, you create a file named queue.xml in your war/WEB-INF/ directory. This is an XML file whose root element is <queue-entries>. The following example does the same thing as the Python example above:

<queue-entries>

<queue>

<name>default</name>

<rate>10/s</rate>

</queue>

<queue>

<name>attack_effects</name>

<rate>100/s</rate>

<bucket-size>20</bucket-size>

</queue>

</queue-entries>

(We’ll see what these settings do in a moment.)

Task queues contain durable data, and this storage counts towards your billable storage quota, just like data in the datastore. You can set a total limit for the amount of task queue data to store with configuration. In queue.yaml, this setting is named total_storage_limit:

total_storage_limit: 200M

queue:

# ...

In queue.xml, the element is named total-storage-limit:

<queue-entries>

<total-storage-limit>200M</total-storage-limit>

<!-- ... -->

</queue-entries>

Its value is a number followed by a unit of storage measurement, such as M for megabytes, G for gigabytes, or T for terabytes.

The default mode for a queue is to be a push queue. To declare a queue as a pull queue, you set the mode of the queue to pull. In queue.yaml:

queue:

- name: process_images

mode: pull

In queue.xml:

<queue-entries>

<queue>

<name>process_images</name>

<mode>pull</mode>

</queue>

</queue-entries>

We’ll mention additional configuration options when we discuss push queues and pull queues in more detail.

Task queue configuration is uploaded when you deploy your application. You can upload new task queue configuration separately from the rest of your app with the AppCfg tool’s update_queues command:

appcfg.py update_queues appdir

TIP

Task queue configuration is distinct from application configuration (app.yaml or appengine-web.xml), and is kept in a separate file. Unlike application configuration, task queue configuration modifies the behavior of task queues for the entire application. All application versions use the same task queue configuration.

Enqueuing a Task

Your app adds a task to a queue (it enqueues the task) by calling the task queue service API with appropriate arguments. Some arguments are specific to push queues or pull queues, but the main API is the same for both.

We’ll look at the Python task queue API, then the Java API, followed by a summary of options you can set for tasks.

Enqueuing a Task in Python

A Python application enqueues tasks by using the API provided by the google.appengine.api.labs.taskqueue module. The simplest way to enqueue a task is to call the add() function. Without arguments, the add() function enqueues a task to the default queue by using the default options for that queue:

from google.appengine.api import taskqueue

# ...

taskqueue.add()

The default queue is a push queue, so App Engine will process this task by invoking a request handler at a URL path. The default URL path for the default queue is:

/_ah/queue/default

You map the URL to a request handler that performs the task in the app.yaml file. You can restrict the URL so that it can only be called by task queues and by the app’s developers (for testing), using login: admin, like so:

handlers:

- url: /_ah/queue/default

script: default_task.app

login: admin

With this configuration, the no-argument call to the add() function enqueues the task on the default queue with the default URL and no arguments. When the task queue processes this task, it issues an HTTP POST request with default arguments to the URL path /_ah/queue/default, which invokes the default_task.app request handler.

The add() function returns immediately after the task is enqueued. The actual execution of the task happens separately from the rest of the code that called add().

You can pass arguments to the add() function to specify aspects of the task or the queue. The Python API offers an object-oriented alternative with the Task and Queue classes. For example, here are several ways to add a task to a queue named reward_players:

# Add a task to the reward_players queue, using the add() function.

taskqueue.add(queue_name='reward_players')

# Construct a Task, and add it to the reward_players queue, using its add() method.

t = taskqueue.Task()

t.add(queue_name='reward_players')

# Construct a Queue for the reward_players queue, then add a Task to it.

t = taskqueue.Task()

q = taskqueue.Queue('reward_players')

q.add(t)

As shown, the queue_name argument to the add() function, the Task object’s add() method, or the Queue constructor, specifies the queue name. This corresponds with the name queue configuration parameter in queue.yaml. If the queue name is anything other than 'default', the queue must appear in the configuration file with the name parameter for the queue to exist.

You set parameters of the task itself by passing keyword arguments to either the add() function or the Task constructor. We’ll introduce these parameters in a moment.

You can add multiple tasks to a single queue in a single service call (a batch call). You must use the Queue object’s add() method for this. To make a batch call, pass an iterable of Task objects as the first argument to add():

# ...

tasks = []

for e in elems:

tasks.append(Task(params=e.params)

queue = Queue('process_elems')

queue.add(tasks)

Enqueuing a Task in Java

The Java task queue API is provided by the com.google.appengine.api.taskqueue package. You manipulate a queue via a Queue instance, which you get from the QueueFactory static methods getDefaultQueue(), which returns the default queue, and getQueue(name), which returns the queue of the given name:

import com.google.appengine.api.taskqueue.Queue;

import com.google.appengine.api.taskqueue.QueueFactory;

// ...

Queue defaultQueue = QueueFactory.getDefaultQueue();

Queue queue = QueueFactory.getQueue("reward_players");

To add a task to a queue, you call its add() method. With no arguments, add() puts a task onto the queue with default options. add() returns a TaskHandle, which describes the task just added, including fields filled in by the system (such as getQueueName():

TaskHandle handle = defaultQueue.add();

The default queue is a push queue, so App Engine will process this task by invoking a request handler at a URL path. The default URL path for the default queue is:

/_ah/queue/default

If you need them, the default queue name and URL path are available as the constants Queue.DEFAULT_QUEUE and Queue.DEFAULT_QUEUE_PATH.

You map a servlet to the URL in the usual way, with web.xml. You can restrict access to the URL so that only App Engine can call it (and outsiders can’t), using a security-constraint with the admin role:

<servlet>

<servlet-name>defaultqueue</servlet-name>

<servlet-class>app.DefaultQueueServlet</servlet-class>

</servlet>

<servlet-mapping>

<servlet-name>defaultqueue</servlet-name>

<url-mapping>/_ah/queue/default</url-mapping>

</servlet-mapping>

<security-constraint>

<web-resource-collection>

<web-resource-name>defaultqueue</web-resource-name>

<url-pattern>/_ah/queue/default</url-pattern>

</web-resource-collection>

<auth-constraint>

<role-name>admin</role-name>

</auth-constraint>

</security-constraint>

Adding an empty task record to a queue can be quite useful, since the task handler can do anything you want. You can further control the behavior of the task and how it interacts with the queue, using options. To set options for a task, you build and pass a TaskOptions instance to theadd() method. This is a Builder-based API, where you can start a new instance with a static method and chain additional options to the same statement. TaskOptions.Builder.withDefaults() returns a TaskOptions with all default settings, which is useful in cases where aTaskOptions is required but no options need to be changed. (add() with no arguments is equivalent to add(TaskOptions.Builder.withDefaults().)

queue.add(TaskOptions.Builder

.withParam("player_id", playerId)

.param("achievement", "paid_in_full");

We’ll discuss some of the available options in the next section and elsewhere in this chapter.

You can add multiple tasks to a single queue in a single service call (a batch call) by passing an Iterable<TaskOptions>. This form of the method returns a List<TaskHandle>, whose members correspond directly with the inputs:

TaskOptions t1, t2, t3;

// ...

List<TaskHandle> handle = queue.add(Arrays.asList(t1, t2, t3);

Task Parameters

A task record on a queue carries two kinds of parameters: parameters that are passed on to the code or system performing the task, and parameters that affect how the task is managed on the queue. You set these parameters when you enqueue the task. After the task is enqueued, the parameters can’t be changed, although you can delete and re-create a task, as needed.

The following sections describe task parameters common to both push queues and pull queues. We’ll look at mode-specific options later.

Payloads

A task’s payload is a set of data intended for the system performing the task. You don’t need a payload if the task handler already knows what to do, but it’s useful to write task handling code in a general way, and parameterize its behavior with a payload.

For example, you could have a task that performs a transformation on a datastore entity, such as to update its property layout to a new schema. The task handler would take the ID of an entity to transform, and perform one transformation. You’d then have a process (possibly also managed with task queues) that traverses all the entities that need transformation with a datastore query, and it creates a task for each entity, using the task handler and a payload.

For convenience, the task API has two ways to set a payload: as a byte string, or as a set of named parameters with byte string values. When you set a payload as a set of parameters, the data is formatted like a web form (application/x-www-form-urlencoded), so the task handler can parse the payload into parameters, using typical web request handling code. Payloads and parameters are mutually exclusive: you set one or the other, not both.

To set a payload in Python, you specify either the payload argument as a str, or the params argument as a mapping of names to values. A parameter value may be a string, or a list of strings:

taskqueue.add(payload=img_data)

taskqueue.add(params={'entity_key': str(e.key(), 'version': '7'})

t = taskqueue.Task(payload=img_data)

q = taskqueue.Queue()

q.add(t)

In Java, you set a payload for a task by using the payload() method or the param() method of TaskOptions, or the corresponding starter methods withPayload() or withParam() of the TaskOptions.Builder class. payload() sets or overrides the payload. param() sets a named parameter, and can be called multiple times in a builder chain to set multiple parameters. A payload or parameter value can be a String or a byte[]:

// byte[] imgData = ...;

queue.add(TaskOptions.Builder.withPayload(imgData);

queue.add(TaskOptions.Builder

.withParam("entity_key", KeyFactory.keyToString(entity.getKey())

.param("version", "7");

The payload(byte[]) form can take an optional second argument to specify the MIME content type of the data. The payload(String) form can take an optional second argument to specify a character set. These arguments affect how the payload is converted into an HTTP request when managed by a push queue.

Task Names

Every task has a unique name. By default, App Engine will generate a unique name for a task when it is added to a queue. You can also set the task name in the app. A task name can be up to 500 characters, and can contain letters, numbers, underscores, and hyphens.

If an app sets the name for a task and another task already exists for that name on a given queue, the API will raise an exception when the app adds the task to the queue. Task names prevent the app from enqueuing the same task more than once on the same queue. App Engine remembers in-use task names for a period of time after the task completes (on the order of days). (The remembered names are called tombstones or tombstoned tasks.)

This is especially useful when enqueuing a task from within a push task handler. Consider the datastore entity transformation example again. A master task performs a datastore query, then creates a transformation task for each entity in the results, like so (in Python):

class MasterTaskHandler(webapp2.RequestHandler):

def post(self):

for entity in models.MyEntity.all():

taskqueue.add(queue_name='upgrade',

params={'entity_key': str(entity.key(),

'version': '7'})

If there is a datastore error while the master task is fetching results, the datastore raises an exception, which bubbles up to webapp and the request handler returns an HTTP 500 server error. The push queue sees the error, then retries the master task from the beginning. If the first run of the master task successfully enqueued some tasks for entities to the 'upgrade' queue, those entities will be added to the queue again, wasting work.

The master task handler can guard against this by using a task name for each task that uniquely represents the work. In the example above, a good task name might be the entity’s key concatenated with the upgrade version (the two parameters to the task):

import re

import webapp2

from google.appengine.api import taskqueue

class MasterTaskHandler(webapp2.RequestHandler):

def post(self):

for entity in models.MyEntity.all():

try:

task_name = str(entity.key() + '7'

task_name = re.sub('[^a-zA-Z0-9_-]', '_', task_name)

taskqueue.add(queue_name='upgrade',

name=task_name,

params={'entity_key': str(entity.key(),

'version': '7'})

except taskqueue.DuplicateTaskNameError, e:

pass

As seen here, in Python you set the task name with the name parameter. An attempt to add a task with a name already in use raises a DuplicateTaskNameError. (In this example, we catch and ignore the exception because we can be confident that the task is enqueued and will be completed.)

In Java, you set the task name with the taskName() (withTaskName() builder method of TaskOptions.

String taskName = KeyFactory.keyToString(entity.getKey() + "7";

taskName = taskName.replaceAll("[^a-zA-Z0-9_-]", "_");

queue.add(TaskOptions.Builder

.withName(taskName)

.param("entity_key", KeyFactory.keyToString(entity.getKey())

.param("version", "7");

TIP

Take care when using datastore keys, query cursors, and other values as parts of task names that the resulting name meets the requirements of task names. A task name can contain letters, numbers, underscores, and hyphens. Base64-encoded values (such as string-ified datastore keys) use this alphabet, but may use equal-sign (=) characters for padding. The examples above use a regular expression to substitute characters outside of this alphabet with underscores.

Countdowns and ETAs

By default, a task is made available to run immediately. A push queue can execute an available task whenever it is ready (subject to its rate limiting configuration, which we’ll see later). The consumer of a pull queue sees only available tasks when it requests a lease.

You can delay the availability of a task when you add it, so it doesn’t become available until a later time. You specify this as either a number of seconds into the future from the time of the enqueue operation (a countdown), or an explicit date and time in the future (an earliest time of availability, or ETA). Delaying the availability of a task can be a useful way to slow down a complex multistage process, such as to avoid hitting a remote server too often.

In Python, these are the countdown and eta options, respectively. A countdown is a number of seconds. An eta is a Unix epoch date-time in the future:

# Execute no earlier than 5 seconds from now.

taskqueue.add(params={'url': next_url}, countdown=5)

# Execute no earlier than December 31, 2012, midnight UTC.

taskqueue.add(params={'url': next_url}, eta=1356940800)

In Java, you use the countdownMillis() (withCountdownMillis() or etaMillis() (withEtaMillis() builder methods with TaskOptions. countdownMillis() takes a number of milliseconds in the future. etaMillis() takes a date and time in the future, as a Unix epoch date-time in milliseconds:

// Execute no earlier than 5 seconds from now.

queue.add(TaskOptions.Builder

.withParam("url", nextUrl)

.countdownMillis(5000);

// Execute no earlier than December 31, 2012, midnight UTC.

queue.add(TaskOptions.Builder

.withParam("url", nextUrl)

.etaMillis(1356940800000);

TIP

Countdowns and ETAs specify the earliest time the task will be available, not the exact time the task will be performed. Do not rely on ETAs as exact timers.

Push Queues

Push queues are queues of tasks that are performed automatically by App Engine at a configurable rate. App Engine performs a task by invoking a request handler of your app. It forms an HTTP request based on the contents of the task record, and issues the request to a URL path associated with the task. App Engine uses the HTTP status code of the response to decide whether the task was completed successfully and should be deleted from the queue. Unsuccessful tasks are retried again later.

Because tasks on push queues are just requests to your app, they use the same infrastructure as any other request handler. You implement tasks by implementing request handlers mapped to URLs, using your web application framework of choice. Tasks are executed in threads of instances, and use the same automatic scaling mechanism as user requests. A queue with multiple tasks will distribute the tasks to multiple threads and instances to be performed in parallel, based on the availability of instances and the processing rate of the queue.

You can control aspects of the HTTP request for task by setting task options. You can also configure aspects of how push queues process tasks, and how tasks are retried.

Task Requests

You can set various aspects of the HTTP request issued for a task using task options, including the URL, the HTTP method, and request headers. The payload for the task also becomes part of the request, depending on the method.

By default, the URL path for a task is based on the queue name, in this format:

/_ah/queue/queue_name

You can override the URL path for an individual task. In Python, this is the url option to taskqueue.Task() or taskqueue.add():

taskqueue.add(url='/admin/tasks/persist_scores')

In Java, this is the url() (withUrl() builder method on TaskOptions:

queue.add(TaskOptions.Builder

.withUrl("/admin/tasks/persist_scores");

TIP

If there is no request handler mapped to the URL for a task (or the task’s queue, if no custom URL is specified), the invocation of the task will return a 404 status code. This is interpreted by the push queue as task failure, and the task is added back to the queue to be retried. You can delete these tasks by flushing the queue in the Administration Console, or by pushing a software version that supplies a successful handler for the task URL.

By default, the HTTP request uses the POST method. In Python, you can change this with the method option, with one of these string values: 'GET', 'POST', 'PUT', 'READ', or 'DELETE'. In Java, this is the method() (withMethod() TaskOptions builder method, which takes a value from the TaskOptions.Method enum: GET, POST, PUT, READ, or DELETE.

You can set HTTP headers on the task’s request. In Python, you provide a headers argument, whose value is a mapping of header names to header string values. In Java, you can set an individual header with the header() (withHeader() builder method of TaskOptions, passing it a string name and a string value. Alternatively, you can set multiple headers in one call with the headers() (withHeaders() builder method, which takes a Map<String, String>.

Task queues have special behavior with regard to app versions. If the version of the app that enqueued a task was the default version, then the task uses the default version of the app when it executes—even if the default version has changed since the task was enqueued. If the version of the app that enqueued the task was not the default version at the time, then the task uses that version specifically when it executes. This allows you to test nondefault versions that use tasks before making them the default. You can set a specific version for a task by using the target option in Python, or the target() (withTarget() builder method of TaskOptions in Java.

App Engine adds the following headers to the request automatically when invoking the request handler, so the handler can identify the task record:

§ X-AppEngine-QueueName, the name of the queue issuing the task request.

§ X-AppEngine-TaskName, the name of the task, either assigned by the app or assigned by the system.

§ X-AppEngine-TaskRetryCount, the number of times this task has been retried.

§ X-AppEngine-TaskETA, the time this task became available, as the number of microseconds since January 1, 1970. This is set when the app specifies a countdown or an ETA, or if the task was retried with a delay.

Incoming requests from outside App Engine are not allowed to set these headers, so a request handler can test for these headers to confirm the request is from a task queue.

Task requests are considered to be from an administrator user for the purposes of the URL access control in app.yaml or web.xml. You can restrict task URLs to be administrator-only, and then only task queues (and actual app administrators) can issue requests to the URL.

The body of a response from a task’s request handler is ignored. If the task needs to store or communicate information, it must do so by using the appropriate services or by logging messages.

A call to a task handler appears in the request log, just like a user-initiated web request. You can monitor and analyze the performance of tasks just as you would user requests.

Processing Rates and Token Buckets

The processing rate for a queue is controlled using a “token bucket” algorithm. In this algorithm, a queue has a number of “tokens,” and it spends a token for each task it executes. Tokens are replenished at a steady rate up to a maximum number of tokens (the “bucket size”). Both the replenishment rate and the bucket size are configurable for a queue.

If a queue contains a task and has a token, it usually executes the task immediately. If a queue has many tasks and many available tokens, it executes as many tasks as it can afford, immediately and in parallel. If there are tasks remaining, the queue must wait until a token is replenished before executing the next task. The token bucket algorithm gives a queue the flexibility to handle bursts of new tasks, while still remaining within acceptable limits. The larger the bucket, the more tasks an idle queue will execute immediately when the tasks are enqueued all at once.

I say it usually executes the tasks immediately because App Engine may adjust the method and rate of how it executes tasks based on the performance of the system. In general, task queue schedules are approximate, and may vary as App Engine balances resources.

A queue does not wait for one task to finish before executing the next task. Instead, it initiates the next task as soon as a token is available, in parallel with any currently running tasks. Tasks are not strictly ordered, but App Engine makes an effort to perform tasks in the order they are enqueued. Tasks must not rely on being executed serially or in a specific order.

Each task queue has a name and processing rate (token replenishment rate and bucket size). Every app has a queue named default that processes 5 tasks per second, with a bucket size of 5. If you don’t specify a queue name when enqueueing a task, the task is added to the default queue. You can adjust the rate and bucket size of the default queue, and can set the rate to 0 to turn it off. Tasks enqueued to a paused queue remain on the queue until you upload the new configuration with a positive rate.

Task queues and token buckets help you control how tasks are executed so you can plan for maximizing throughput, making the most efficient use of system resources to execute tasks in parallel. Tasks inevitably share resources, even if the resource is just the pool of warmed-up application servers. Executing a bunch of tasks simultaneously may not be the fastest way to complete all the tasks, since App Engine may need to start up new instances of the application to handle the sudden load. If multiple tasks operate on the same entity groups in the datastore, it may be faster to perform only a few tasks at a time and let datastore retries sort out contention, instead of relying on task retries to drive in all the changes. Limiting the execution rate with token buckets can actually result in faster completion of multiple tasks.

Queue processing rates are configured using the queue configuration file (queue.yaml or queue.xml). You specify the rate of bucket replenishment using the rate option for a queue. Its value is a number, a slash (/), and a unit of time (s for seconds), such as 20/s for twenty tokens per second.

You specify the size of the token bucket with the bucket_size (bucket-size for queue.xml) option. Its value is a number:

queue:

- name: fast_queue

rate: 20/s

bucket_size: 10

In addition to the rate and bucket size, you can set a maximum number of tasks from the queue that can be executed at the same time, with the max_concurrent_requests (max-concurrent-requests) option. Its value is the number of tasks. If this many task requests are in progress, the queue will wait to issue another task even if there are tokens in the bucket. This allows for large bucket sizes but still prevents bursts of new tasks from flooding instances. It also accommodates tasks that take a variable amount of time, so slow tasks don’t take over your instances.

Together, these options control the flow of tasks from the push queue into the request queue for the application. If a given queue is processing tasks too quickly, you can upload a new temporary configuration for the queue that tells it to run at a slower rate, and the change will take effect immediately. You can experiment with different rates and token bucket sizes to improve task throughput.

Retrying Push Tasks

To ensure that tasks get completed in a way that is robust against system failure, a task queue will retry a task until it is satisfied the task is complete.

A push queue retries a task if the request handler it invokes returns an HTTP response with a status code other than a “success” code (in the range 200–299). It retries the task by putting it back on the queue with a countdown, so it’ll wait a bit before trying again in the hopes that the error condition will subside. You can configure the retry behavior for every queue in a task by using the queue configuration, and you can override this configuration on a per-task basis with task options.

Under very rare circumstances, such as after a system failure, a task may be retried even if it completed successfully. This is a design trade-off that favors fast task creation over built-in, once-only fault tolerance. A task that can be executed multiple times without changing the end result is called idempotent. Whether a task’s code must be strictly idempotent depends on what the task is doing and how important it is that the calculation it is performing be accurate. For instance, a task that deletes a datastore entity can be retried because the second delete fails harmlessly.

TIP

Because a task on a push queue is retried when its handler returns anything other than a successful HTTP status code, a buggy handler that always returns an error for a given input will be retried indefinitely, or until the retry limit is reached if a retry limit was specified.

If a task needs to abort without retrying, it must return a success code.

There are five parameters that control how push queues retry a given task. We’ll define these parameters first. Then we’ll see how to set defaults for these parameters in queue configuration, and how to override them for a specific task.

The task_retry_limit is the maximum number of times a failing task is retried before it is deleted from the queue. If you do not specify a retry limit, the task is retried indefinitely, or until you flush the cache or delete the task by some other means. A retry limit is a good guard against perpetual failure (such as a bug in a task), and in some cases it makes sense to abort a task in transient but long-lasting failure conditions. Be sure to set it high enough so that tasks can accommodate brief transient failures, which are to be expected in large distributed systems.

The task_age_limit calls for automatic deletion of an incomplete task after a period of time on the queue. If not specified, the task lives until it succeeds, hits its retry limit, or is deleted by other means. Its value is a number followed by a unit of time: s for seconds, m for minutes, h for hours, d for days. For example, 3d is three days.

When a task fails, it is readded to the queue with a countdown. The duration of this countdown doubles each time the task is retried, a method called exponential backoff. (The queue is “backing off” the failing task by trying it less frequently with each failure.) Three settings control the backoff behavior. min_backoff_seconds is the minimum countdown, the countdown of the first retry. max_backoff_seconds is the maximum; retries will increase the countdown up to this amount. These values are an amount of time, as a number of seconds. Finally, themax_doublings setting lets you set the number of times the countdown doubles. After that many retries, the countdown stays constant for each subsequent retry.

To set any of these retry options as the default for all tasks added to a queue, you add them to the queue configuration file, in a retry_parameters subsection of the queue’s configuration. Here’s an example of retry configuration in queue.yaml:

queue:

- name: respawn_health

rate: 2/s

retry_parameters:

task_retry_limit: 10

max_doublings: 3

The same configuration in queue.xml is similar:

<queue-entries>

<queue>

<name>respawn_health</name>

<rate>2/s</rate>

<retry-parameters>

<task-retry-limit>10</task-retry-limit>

<max-doublings>3</max-doublings>

</retry-parameters>

</queue>

</queue-entries>

To override these settings for a task in Python, you set the retry_options argument to taskqueue.Task() or taskqueue.add() with an instance of the TaskRetryOptions() class. The class’s constructor takes the retry options as keyword arguments, and validates them:

t = taskqueue.Task(

retry_options=taskqueue.TaskRetryOptions(

task_retry_limit=10,

max_doublings=3)

q = taskqueue.Queue('respawn_health')

q.add(t)

In Java, the TaskOptions builder method retryOptions() (withRetryOptions() takes an instance of the RetryOptions class. This class also uses the builder pattern with methods for each setting: taskRetryLimit(), taskAgeLimitSeconds(), minBackoffSeconds(),maxBackoffSeconds(), and maxDoublings() (and their withXXX() starter equivalents):

Queue queue = QueueFactory.getQueue("respawn_health");

queue.add(TaskOptions.Builder.withRetryOptions(

RetryOptions.Builder

.withTaskRetryLimit(10)

.maxDoublings(3));

Pull Queues

In our initial definition of a task queue, we said that a queue has a producer and a consumer. With push queues, the producer is application code running in an App Engine request handler, and the consumer is the App Engine push queue mechanism, which calls request handlers to do the actual work of the task. With pull queues, you provide the consumer logic. The consumer calls the pull queue to lease one or more tasks, and the queue ensures that a task is leased to only one consumer at a time. Typically, the consumer deletes the task from the queue after performing the corresponding work, so no other consumer sees it. If the consumer fails to delete it, eventually the lease expires and the pull queue makes the task available to consumers again.

A pull queue is useful when you want to customize the consumer logic. For example, the push queue driver consumes one task at a time, executing a separate request handler for each task. With a pull queue, a custom consumer can lease multiple related tasks at once, and perform them together as a batch. This might be faster or more productive than doing it one at a time. For example, each task might represent an update to an entity group in the datastore. If a pull queue consumer sees multiple updates in the queue, it can lease them all in a batch, and make a single transactional update to the entity group for all of them. This is likely to be faster than multiple push queue tasks each trying to make their own transactional update to the same data.

You can build pull queue consumers on App Engine, using request handlers (such as a scheduled task that processes the queue periodically), or using a long-running process on a backend that polls for new tasks on a recurring basis. You can also build a consumer that runs on a remote system, using the task queue web service REST API. With a remote consumer, your app can enqueue tasks that trigger behavior in separate systems. The REST API also allows you to build remote producers that add tasks to pull queues. (A remote producer can’t add to push queues directly, but a local consumer running on App Engine could periodically convert remotely added tasks to push queue tasks.)

TIP

As of press time for this edition, the task queue REST API is released as an experimental feature. For more information about this feature, see the official App Engine website.

To create a pull queue, you add it to the queue configuration file. A pull queue must have a name, and must have its mode set to pull. In Python’s queue.yaml:

queue:

- name: update_leaderboard

mode: pull

In Java’s queue.xml:

<queue-entries>

<queue>

<name>update_leaderboard</name>

<mode>pull</mode>

</queue>

</queue-entries>

Enqueuing Tasks to Pull Queues

You enqueue a task on a pull queue similarly to how you enqueue a task on a push queue, using a named queue whose mode is pull. As with push queues, a task on a pull queue can have a payload, a task name, and a countdown or ETA.

A task added to a pull queue must have a method set to PULL. This tells the queue that the task is only compatible with the queue when it is in the pull queue mode. In Python, this is the method='PULL' argument:

taskqueue.add(queue_name='update_leaderboard', method='PULL')

In Java, you call the method() (withMethod() builder method on TaskOptions with the value TaskOptions.Method.PULL:

Queue queue = QueueFactory.getQueue("update_leaderboard");

queue.add(TaskOptions.Builder

.withMethod(TaskOptions.Method.PULL);

Leasing and Deleting Tasks

A pull queue consumer running on App Engine can use the task queue service API to lease and delete tasks.

A lease is a guarantee that the consumer that acquired the lease has exclusive access to a task for a period of time. During that time, the consumer can do whatever work corresponds with that task record. The consumer is expected to delete the task at the end.

To lease tasks from a pull queue, you call a method of the queue specifying the duration of the lease and the maximum number of tasks. The service reserves up to that many tasks currently available on the queue for the requested amount of time, then returns identifiers for each of the successfully leased tasks. You can use these identifiers to delete the tasks, or update leases.

In Python, you construct the Queue object for the named pull queue, then call the lease_tasks() method. Its arguments are the lease duration as a number of seconds, and the maximum number of tasks to return. The method returns a list of Task objects, possibly empty if the queue has no available tasks:

# Lease 5 tasks from update_leaderboard for up to 20 seconds.

queue = Queue('update_leaderboard')

tasks = queue.lease_tasks(20, 5)

for task in tasks:

# Read task.payload and do the corresponding work...

In Java, you construct the Queue object for the named pull queue, then call the leaseTasks() method. This method takes a LeaseOptions instance, built from LeaseOptions.Builder builder methods. The leasePeriod() (withLeasePeriod() builder method takes a long and a java.util.concurrent.TimeUnit, which together specify the duration of the lease. The countLimit() (withCountLimit() builder method sets the maximum number of tasks to lease. leaseTasks() returns a List<TaskHandle>, which may be empty:

import java.util.concurrent.TimeUnit;

// ...

// Lease 5 tasks from update_leaderboard for up to 20 seconds.

Queue queue = QueueFactory.getQueue("update_leaderboard");

List<TaskHandle> tasks = queue.leaseTasks(

LeaseOptions.Builder

.withLeasePeriod(20, TimeUnit.SECONDS)

.countLimit(5);

for (TaskHandle task : tasks) {

// Read task.getPayload() and do the corresponding work...

}

Once the consumer has executed the work for a task successfully, it must delete the task to prevent it from being re-leased to another consumer. In Python, you call the delete_task() method of the Queue, passing it a Task object or a list of Task objects to delete:

# ...

queue.delete_task(tasks)

In Java, you call the deleteTask() method of the Queue. The single-task form takes either a TaskHandle or a String task name, and returns true on success. The batch form takes a List<TaskHandle>, and returns a corresponding List<Boolean>:

// ...

List<Boolean> success = queue.deleteTask(tasks);

TIP

Each of the examples shown here leases a batch of tasks, does the work for all the tasks, and then deletes them all with another batch call. When using this pattern, make sure the lease duration is long enough to accommodate all the work in the batch. Even if you delete each task as it finishes, the last task must wait for all of the others.

If the consumer needs more time, you can renew the lease without relinquishing it to another consumer. In Python, the Queue method modify_task_lease() takes the Task and a number of seconds for the new lease. In Java, the Queue method modifyTaskLease() takes aTaskHandle, a long, and a TimeUnit, and returns the new TaskHandle.

Retrying Pull Queue Tasks

When a lease duration on a task expires, the task becomes available on the pull queue. When another consumer leases tasks from the queue, it may obtain a lease on the task and start the work again. This is the pull queue equivalent of a “retry”: if the first consumer failed to delete the task before the lease expired, then the task is assumed to have failed and needs to be tried again.

You can set a limit to the number of times a task is retried. This can be a default for all tasks added to a queue, in the queue configuration. You can also set this for an individual task, overriding the queue default. If the lease for a task is allowed to expire as many times as the limit for the task, the task is deleted automatically.

To configure a retry limit for a queue in queue.yaml, you provide a retry_parameters section in the queue’s configuration, with a task_retry_limit value:

queue:

- name: update_leaderboard

retry_parameters:

task_retry_limit: 10

The same configuration in queue.xml is similar:

<queue-entries>

<queue>

<name>update_leaderboard</name>

<retry-parameters>

<task-retry-limit>10</task-retry-limit>

</retry-parameters>

</queue>

</queue-entries>

To set the limit for an individual task in Python, you provide the retry_options argument with a TaskRetryOptions instance as its value, passing the limit to the constructor:

t = taskqueue.Task(

retry_options=taskqueue.TaskRetryOptions(

task_retry_limit=10)

q = taskqueue.Queue('update_leaderboard')

q.add(t)

In Java, the TaskOptions builder method retryOptions() (withRetryOptions() takes an instance of the RetryOptions class, using the taskRetryLimit() (withTaskRetryLimit() builder method to set the limit:

Queue queue = QueueFactory.getQueue("update_leaderboard");

queue.add(TaskOptions.Builder.withRetryOptions(

RetryOptions.Builder

.withTaskRetryLimit(10));

Transactional Task Enqueueing

Task queues are an essential reliability mechanism for App Engine applications. If a call to enqueue a task is successful, and the task can be completed, the task is guaranteed to be completed, even given the possibility of transient service failure. It is common to pair the reliability of task queues with the durability of the datastore: tasks can take datastore values, act on them, and then update the datastore.

To complete this picture, the task queue service includes an extremely useful feature: the ability to enqueue a task as part of a datastore transaction. A task enqueued within a transaction is only enqueued if the transaction succeeds. If the transaction fails, the task is not enqueued.

This opens up a world of possibilities for the datastore. Specifically, it enables easy transactions that operate across an arbitrary number of entity groups, with eventual consistency.

Consider the message board example from Chapter 7. To maintain an accurate count of every message in each conversation, we have to update the count each time a message is posted. To do this with strong consistency, the count and the message have to be updated in the same transaction, which means they have to be in the same entity group—and therefore every message in the thread has to be in the same entity group. This might be acceptable for a count of messages per conversation, since it’s unlikely that many users will be posting to the same conversation simultaneously, and even so, the delay for resolving concurrency failures might not be noticed.

But what if we want a count of every message on the website? Putting every message in a single entity group would be impractical, as it would effectively serialize all updates to the entire site. We need a way to update the count reliably without keeping everything in one entity group.

Transactional task enqueueing lets us update the count reliably without concern for entity groups. To post a message, we use a transaction to create the message entity and enqueue a task that will update the count. If the transaction fails, the task is not enqueued, so the count remains accurate. The task is performed outside of the transaction, so the count does not need to be in the same entity group as the message, but transactional enqueueing and task retries ensure that the count is updated, but only under the proper circumstances.

Of course, this comes with a trade-off: it must be acceptable for the count to be inaccurate between the time the message entity is created and the time the count is updated. In other words, we must trade strong consistency for eventual consistency. Transactional task enqueueing gives us a simple way to implement eventually consistent global transactions.

You might think that eventual consistency is suitable for the global message count, because who cares if the message count is accurate? But eventual consistency is useful for important data as well. Say the user Alicandria posts a quest with a bounty of 10,000 gold, and a guild of 20 players completes the quest, claiming the bounty. Since any player can trade gold with any other player, it is impractical to put all players in the same entity group. A typical person-to-person exchange can use a cross-group transaction, but this can only involve up to five entity groups. So to distribute the bounty, we use transactional task enqueueing: the app deducts 10,000 gold pieces from Alicandria’s inventory, then enqueues a task to give 500 gold pieces to each member of the guild, all in a transaction. We use task names and memcache locks to ensure the system doesn’t accidentally create new gold pieces if it retries the task. Also, since the guild might get angry if they don’t get their money quickly, we configure the gold transfer queue to execute at a fast rate and with a large token bucket.

You can enqueue up to five tasks transactionally. In a typical case, it’s sufficient to start a master task within the transaction, then trigger additional tasks as needed, and let the queue-retry mechanism drive the completion of the work.

The API for transactional task enqueuing is simple, and typically just means calling the task queue API during an active transaction. Let’s take a quick look at Python, then Java.

TIP

Only the task enqueuing action joins the datastore transaction. The task itself is executed outside of the transaction, either in its own request handler (push queues) or elsewhere (pull queues). Indeed, by definition, the task is not enqueued until the transaction is committed, so the task itself has no way to contribute further actions to the transaction.

Transactional Tasks in Python

Recall from Chapter 7 that we perform a transaction in Python and ext.db by calling a function that has the decorator @db.transactional. Every datastore call made between the start and end of the function participates in a single transaction, unless the call opts out of the current transaction (by joining or creating another transaction).

The add() function, the add() convenience method of a Task, and the add() method of a Queue all take an optional transactional=True argument. If provided, the task will be enqueued as part of the current transaction. If there is no current transaction, it raises ataskqueue.BadTransactionStateError. If the argument is not provided (or its value is False), the task is enqueued immediately, regardless of whether the transaction commits or aborts.

Here’s an example of transferring gold from one player to many other players in a single transaction:

from google.appengine.api import taskqueue

from google.appengine.ext import db

@db.transactional

def pay_quest_bounty(quest_master_key, guild_member_keys, amount):

quest_master = db.get(quest_master_key)

assert quest_master is not None

quest_master.gold -= amount

db.put(quest_master)

taskqueue.add(url='/actions/payment/bounty',

params={'user_key': guild_member_keys, # repeated element

'total_amount': str(amount)})

The deduction and the task enqueue occur in the same transaction, so there’s no risk of the deduction happening without enqueueing the task (gold disappears), nor is there a risk of the task getting enqueued without the deduction succeeding (gold is created). Assuming the bounty task is implemented correctly (and properly handles edge cases like a guild member’s account being deleted), the transaction will complete with eventual consistency.

Transactional Tasks in Java

In the Java datastore API from Chapter 7, we saw that you start a datastore transaction with the beginTransaction() service method. This method returns a Transaction object. You perform a datastore operation within the transaction by passing the Transaction object to the operation’s method. Alternatively, if you configured the service instance with ImplicitTransactionManagementPolicy.AUTO, you can call an operation’s method without the transaction instance, and it will join the most recently created (and not committed) transaction.

The task queue API works the same way. The add() method of a Queue accepts an optional Transaction instance as its first argument. If you’re using ImplicitTransactionManagementPolicy.AUTO, calling the add() method without this argument while an uncommitted transaction is open will cause the enqueue operation to join the transaction. The task is enqueued when the transaction is committed, if and only if the txn.commit() method is successful.

Here is a Java version of the previous Python example:

import java.util.List;

import com.google.appengine.api.datastore.DatastoreService;

import com.google.appengine.api.datastore.DatastoreServiceFactory;

import com.google.appengine.api.datastore.Entity;

import com.google.appengine.api.datastore.Key;

import com.google.appengine.api.datastore.Transaction;

import com.google.appengine.api.taskqueue.Queue;

import com.google.appengine.api.taskqueue.QueueFactory;

import com.google.appengine.api.taskqueue.TaskOptions;

// ...

void payQuestBounty(Key questMasterKey,

List<String> guildMemberKeyStrs,

long amount) {

DatastoreService datastore = DatastoreServiceFactory.getDatastoreService();

Transaction txn = datastore.beginTransaction();

Entity questMaster = datastore.get(questMasterKey);

// ... Handle the case where questMaster == null...

Long gold = (Long) questMaster.getProperty("gold");

gold -= amount;

questMaster.setProperty("gold", gold);

datastore.put(questMaster);

Queue queue = QueueFactory.getDefaultQueue();

TaskOptions task = TaskOptions.Builder.withUrl("/actions/payment/bounty");

for (userKeyStr : guildMemberKeyStrs) {

task = task.param("user_key", userKeyStr);

}

task = task.param("total_amount", Long(amount).toString();

queue.add(task);

txn.commit();

}

Task Chaining

A single task performed by a push queue can run for up to 10 minutes. There’s a lot you can get done in 10 minutes, but the fact that there’s a limit at all raises a red flag: a single task does not scale. If a job uses a single task and the amount of work it has to do scales with a growing factor of your app, the moment the amount of work exceeds 10 minutes, the task breaks.

One option is to use a master task, a task whose job is to figure out what work needs to be done, and then create an arbitrary number of tasks to do fixed-size units of the work. For example, the master task could fetch a feed URL from a remote host, and then create a task for each entry in the feed to process it. This goes a long way to doing more work within the 10 minute limit, and is useful to parallelize the units of work to complete the total job more quickly. But it’s still a fixed capacity, limited to the number of child tasks the master task can create in 10 minutes.

For jobs of arbitrary size, another useful pattern is a task chain. The idea is straightforward: complete the job with an arbitrary number of tasks, where each task is responsible for creating the subsequent task, in addition to doing a fixed amount of work. Each task must be capable of performing its own amount of work in a fixed amount of time, as well as determining what the next unit of work ought to be.

Task chains are especially useful when combined with datastore query cursors, which meet these exact requirements. A task that ought to update every entity of a kind (possibly those that match other query criteria) can use the following steps:

1. Start a query for entities of the kind. If the task payload includes a cursor, set the query to start at the cursor location.

2. Read and process a fixed number of entities from the query results.

3. Take the cursor after the last query result. If there are any results after the cursor, create a new task with the new cursor as its payload.

4. Return a success code.

This produces the simple task chain shown in Figure 16-2. Each new task starts just as the previous task finishes.

A simple task chain, where each task does a fixed amount of work, then creates the next task

Figure 16-2. A simple task chain, where each task does a fixed amount of work, then creates the next task

If the work for the next task can be determined before performing the work for the current task, and the next task does not depend upon the completion of the current task, we can improve the performance of this job by creating the next task before we begin the work. In the case of iterating over every result of a query, we can get the next cursor immediately after performing the query:

1. Start a query for entities of the kind. If the task payload includes a cursor, set the query to start at the cursor location.

2. Read a fixed number of entities from the query results.

3. Take the cursor after the last query result. If there are any results after the cursor, create a new task with the new cursor as its payload.

4. Process the results from the query.

5. Return a success code.

This technique compresses the timeline so the work of each task is performed concurrently. App Engine will perform the tasks up to the capacity of the queue, and will utilize instances based on your app’s performance settings, so you can create tasks aggressively and throttle their execution to your taste. Figure 16-3 shows the timeline of the compressed behavior.

An improved task chain, where each task creates the next task before doing its own work

Figure 16-3. An improved task chain, where each task creates the next task before doing its own work

The last step of each task is to return a success code. This tells the push queue that the task was successful and can be deleted from the queue. If the task does not return a success code, such as due to a transient service error throwing an uncaught exception, the push queue puts the task back on the queue and tries it again. As we’ve described it so far, this is a problem for our task chain, because retrying one task will create another task for the next unit of work, and so on down the rest of the chain. We might end up with something like Figure 16-4, with a ton—potentially an exponential amount—of wasted work.

A transient error in a naive task chain explodes into many chains of wasted work, as links in the chain are retried

Figure 16-4. A transient error in a naive task chain explodes into many chains of wasted work, as links in the chain are retried

You might think the solution is to go back to the previous version of the task, where the work is performed before the next task is enqueued. This would reduce the likelihood that a transient service error occurs after the next link in the chain is created, but this doesn’t eliminate the possibility. Even with no lines of code following the task enqueue operation in the handler, a failure on the app instance might still cause an error condition, and a fork in the chain.

The real solution is to use task names. As we saw earlier, every task has a unique name, either specified by the app or by the system. A given task name can only be used once (within a reasonably long period of time, on the order of days). When a named task finishes, it leaves behind a “tombstone” record to prevent the name from being reused right away.

A task name can be any string that identifies the next unit of work, and that the current task can calculate. In the datastore traversal example, we already have such a value: the query cursor. We can prepend a nonce value that identifies the job, to distinguish the query cursor for this job from a similar cursor of a job we might run later.

Our resilient task routine is as follows:

1. Start a query for entities of the kind. If the task payload includes a cursor, set the query to start at the cursor location.

2. Read a fixed number of entities from the query results.

3. Take the cursor after the last query result. If there are any results after the cursor, prepare to create a new task. If the task payload contains a nonce value for the job, use it, otherwise generate a new one. Generate the next task name based on the nonce value and the new query cursor. Create a new task with the task name, and the nonce value and the new cursor as its payload.

4. Process the results from the query.

5. Return a success code.

Transient errors no longer bother us, resulting in a pattern like Figure 16-5. Tasks that fail due to transient errors are retried and may cause their units of work to complete later, but they no longer cause the rest of the chain to be re-created for each failure.

Named tasks prevent an exploding chain during a transient error

Figure 16-5. Named tasks prevent an exploding chain during a transient error

We close this discussion of task chains with an example implementation in Python. You can find the equivalent Java example in the sample code package on the website for this book:

import datetime

import re

import time

import urllib

import webapp2

from google.appengine.api import taskqueue

from google.appengine.ext import db

TASK_SIZE = 10

class Quest(db.Model):

# ...

end_timestamp = db.IntegerProperty()

end_datetime = db.DateTimeProperty()

class UpgradeQuestEntitiesTaskHandler(webapp2.RequestHandler):

def post(self):

query = Quest.all()

cursor = self.request.get('cursor', None)

if cursor:

query.with_cursor(cursor)

results = query.fetch(TASK_SIZE)

new_cursor = query.cursor()

query.with_cursor(new_cursor)

if query.count(1) == 1:

job_id = self.request.get('job_id')

task_name = job_id + '_' + new_cursor

task_name = re.sub('[^a-zA-Z0-9_-]', '_', task_name)

taskqueue.add(

name=task_name,

url='/admin/jobs/upgradequests/task',

params={

'job_id': job_id,

'cursor': new_cursor })

# Do the work.

for quest in results:

# Upgrade end_timestamp to end_datetime.

quest.end_datetime = datetime.fromtimestamp(quest.end_timestamp)

db.put(results)

class StartUpgradeQuestEntitiesJob(webapp2.RequestHandler):

def get(self):

started_job_id = self.request.get('job_id', None)

if started_job_id is not None:

self.response.out.write(

'<p>Job started: %s</p>' % started_job_id)

self.response.out.write("""

<form action="/admin/jobs/upgradequests/start" method="POST">

<input type="submit" value="Start New Upgrade Quest Entities Job" />

</form>

""")

def post(self):

job_id = ('UpgradeQuestEntities_%s' % int(time.time())

taskqueue.add(

name=job_id,

url='/admin/jobs/upgradequests/task',

params={ 'job_id': job_id })

self.redirect('/admin/jobs/upgradequests/start?'

+ urllib.urlencode({ 'job_id': job_id })

application = webapp2.WSGIApplication([

('/admin/jobs/upgradequests/task', UpgradeQuestEntitiesTaskHandler),

('/admin/jobs/upgradequests/start', StartUpgradeQuestEntitiesJob)],

debug=True)

Task Queue Administration

The Administration Console provides a great deal of information about the current status of your task queues and their contents. The Task Queues panel lists all the queues you have configured, with their rate configurations and current running status. You can click on any queue to get more information about individual tasks, such as their calling parameters and how many times they have been retried. You can also delete tasks or force tasks to run, pause and restart the queue, or purge all tasks.

The features of this panel are intuitive, so we’ll just add one comment on a common use of the panel: finding and deleting stuck tasks. If a queue has a task that is failing and being retried repeatedly, the Oldest Task column may have a suspiciously old date. Select the queue, then browse for a task with a large number in the Retries column. You can trace this back to logs from an individual attempt by copying the URL from the Method/URL column, then going to the Logs panel to do a search for that path. You may need to force a run of the task by clicking the Run Now button to get a recent entry to show up in the logs.

How you fix the problem depends on how important the data in the task record is. If the task is failing because of an error in the code that can be fixed, you can leave the task in the queue, fix the bug in the code, and then deploy new code to the target version of the task. When the task is retried, it’ll use the new code, and proceed to completion. If the task is failing because the task record is incompatible with a recent change to the code, you can try to rescue the task record with a code change, or just delete the task record. It’s often easier to delete old task records and re-create the activity they represent than to figure out how to usher them to completion.

Deferring Work

The Python and Java libraries each include a handy utility that makes it easy to throw work into a task without writing a custom task handler. The utility uses a prepackaged general purpose task handler to process deferred work.

Each utility is specific to the language. Let’s see how to defer work in Python first, then in Java.

Deferring Work in Python

To defer work in Python, you create a Python function or other callable object that performs the work to be executed outside of the current request handler, then pass that callable object to the defer() function from the google.appengine.ext.deferred package. The defer()function takes the object to call within the task and arguments to pass to the callable object.

To use this feature, you must set up the deferred work task handler. This is a built-in for app.yaml:

builtins:

- deferred: on

Here’s a simple example that spans two Python modules, one containing the deferred function, and one containing the request handler that defers it. First, here’s the function, to appear in a module named invitation.py:

from google.appengine.api import mail

import logging

_INVITATION_MESSAGE_BODY = '''

You have been invited to join our community...

'''

def send_invitation(recipient):

mail.send_mail('support@example.com',

recipient,

'You\'re invited!',

_INVITATION_MESSAGE_BODY)

logging.info('Sent invitation to %s' % recipient)

And here’s the request handler script:

from google.appengine.ext import webapp2

from google.appengine.ext import deferred

import invitation

class SendInvitationHandler(webapp2.RequestHandler):

def get(self):

# recipient = ...

deferred.defer(invitation.send_invitation, recipient)

# ...

application = webapp2.WSGIApplication([

('/sendinvite', SendInvitationHandler),

], debug=True)

The defer() function enqueues a task on the default queue that calls the given callable object with the given arguments. The arguments are serialized and deserialized using Python’s pickle module; all argument values must be pickle-able.

Most Python callable objects can be used with defer(), including functions and classes defined at the top level of a module, methods of objects, class methods, instances of classes that implement __call__(), and built-in functions and methods. defer() does not work with lambdafunctions, nested functions, nested classes, or instances of nested classes. The task handler must be able to access the callable object by name, possibly via a serializable object, since it does not preserve the scope of the call to defer().

You also can’t use a function or class in the same module as the request handler class from which you call defer(). This is because pickle believes the module of the request handler class to be __main__ while it is running, and so it doesn’t save the correct package name. This is why the previous example keeps the deferred function in a separate module.

You can control the parameters of the task, such as the delay, by passing additional arguments to defer(). These are the same arguments you would pass to Task(), but with the argument names prepended with an underscore so they are not confused with arguments for the callable:

deferred.defer(invitation.send_invitation,

'juliet@example.com',

_countdown=86400)

To call the callable, the task handler determines the module location of the callable from the description saved by the defer() function, imports the required module, recreates any required objects from their serialized forms, then calls the callable. If the module containing the callable imports other modules, those imports will occur during the task. If the deferred callable requires any additional setup, such as changes to the module import path, make sure this happens in the callable’s module, or within the callable itself.

The task handler determines the success or failure of the task based on exceptions raised by the callable. If the callable raises a special exception called deferred.PermanentTaskFailure, the task handler logs the error, but returns a success code to the task queue so the task is not retried. If the callable raises any other exception, the exception is propagated to the Python runtime and the handler returns an error code, which causes the task queue to retry the task. If the callable does not raise an exception, the task is considered successful.

The deferred library is careful to raise deferred.PermanentTaskFailure for errors it knows will prevent the task from ever succeeding. Such errors log messages, then return success to flush the task from the queue.

Deferring Work in Java

The Java task queue library’s mechanism for deferring work is based on the DeferredTask interface. This interface describes a class which is both java.lang.Runnable and java.io.Serializable. You provide an implementation of this interface, with a run() method that does the work. To defer a call to this code, you add an instance of your class to a push queue, using a special form of the withPayload() builder method to TaskOptions. This method serializes the instance and puts it on the queue, to be executed by the deferred task handler provided by the runtime environment. No further setup is required, making this a convenient way to run code in the future:

import com.google.appengine.api.taskqueue.DeferredTask;

import com.google.appengine.api.taskqueue.Queue;

import com.google.appengine.api.taskqueue.QueueFactory;

import com.google.appengine.api.taskqueue.TaskOptions;

public class MyWork implements DeferredTask {

String arg;

public MyWork(String arg) {

this.arg = arg;

}

void run() {

// Do something with arg...

}

}

// ...

Queue queue = QueueFactory.getDefaultQueue();

queue.add(TaskOptions.Builder.withPayload(new MyWork("my arg"));

Within the run() method, your deferred task can access basic information about the request handler in which it is running by using static methods of the DeferredTaskContext class. The getCurrentServlet() method returns the servlet, and getCurrentRequest() andgetCurrentResponse() return the request and response, respectively.

By default, an unchecked exception thrown by the run() method causes the task to be retried. You can disable this behavior by calling the DeferredTaskContext.setDoNotRetry(true) method within the run() method. With retries disabled, any uncaught exception will be treated as a clean exit, and the task will be deleted from the queue.

TIP

The serialized version of the instance of your deferred task is stored as data in the task record. If you upload a new version of your application code and serialized data cannot be deserialized into an instance by using the new code, the task will fail perpetually. Take care to either clear deferred work from task queues before uploading new code, or only make serialization-compatible changes to the code while deferred work is in a task queue.

Scheduled Tasks

Applications do work in response to external stimuli: user requests, incoming email and XMPP messages, HTTP requests sent by a script on your computer. And while task queues can be used to trigger events across a period of time, a task must be enqueued by application code before anything happens.

Sometimes you want an application to do something “on its own.” For instance, an app may need to send nightly email reports of the day’s activity, or fetch news headlines from a news service. For this purpose, App Engine lets you specify a schedule of tasks to perform on a regular basis. In the App Engine API, scheduled tasks are also known as “cron jobs,” named after a similar feature in the Unix operating system.

A scheduled task consists of a URL path to call and a description of the recurring times of the day, week, or month at which to call it. It can also include a textual description of the task, which is displayed in the Administration Console and other reports of the schedule.

To execute a scheduled task, App Engine calls the URL path by using an empty GET request. A scheduled task cannot be configured with parameters, headers, or a different HTTP method. If you need something more complicated, you can do it in the code for the request handler mapped to the scheduled task’s URL path.

As with task queue handlers, you can secure the URL path by restricting it to application developers in the frontend configuration. The system can call such URL paths to execute scheduled tasks.

The HTTP request includes the header X-AppEngine-Cron: true to differentiate it from other App Engine–initiated requests. Only App Engine can set this header. If an external request tries to set it, App Engine removes it before it reaches your app. You can use the header to protect against outside requests triggering the job. Scheduled task requests are also treated like requests from administrator user (similarly to push queue tasks), so you can guard task URLs by using a login requirement in app.yaml or web.xml.

Just like tasks in push queues, scheduled tasks have a request deadline of 10 minutes, so you can do a significant amount of computation and service calls in a single request handler. Depending on how quickly the task needs to be completed, you may still wish to break work into small pieces and use task queues to execute them on multiple instances in parallel.

Unlike push queues, scheduled tasks that fail are not retried. If a failed schedule task should be retried immediately, the scheduled task should put the work onto a push queue.

The development console does not execute scheduled tasks automatically. If you need to test a scheduled task, you can visit the task URL path while signed in as an administrator. The Python version of the development console includes a Cron Jobs section that lists the URL paths in the configuration file for easy access.

If you have enabled billing for your application, your app can have up to 100 task schedules. At the free billing tier, an app can have up to 20 task schedules.

Configuring Scheduled Tasks

In Python, the schedule is a configuration file named cron.yaml. It contains a value named cron, which is a list of task schedules. Each task schedule has a description, a url, and a schedule. You can also specify a timezone for the schedule:

cron:

- description: Send nightly reports.

url: /cron/reports

schedule: every day 23:59

timezone: America/Los_Angeles

- description: Refresh news.

url: /cron/getnews

schedule: every 1 hours

In Java, the corresponding file in the WEB-INF/ directory is named cron.xml. A <cronentries> element contains zero or more <cron> elements, one for each schedule. The <url>, <description>, <schedule>, and <timezone> elements define the scheduled task:

<cronentries>

<cron>

<url>/cron/reports</url>

<description>Send nightly reports.</description>

<schedule>every day 23:59</schedule>

<timezone>America/Los_Angeles</timezone>

</cron>

<cron>

<url>/cron/getnews</url>

<description>Refresh news.</description>

<schedule>every 1 hours</schedule>

</cron>

</cronentries>

As with other service configuration files, the scheduled task configuration file applies to the entire app, and is uploaded along with the application. You can also upload it separately. With the Python SDK:

appcfg.py update_cron app-dir

With the Java SDK (appcfg.sh update_cron on Mac OS X or Linux):

appcfg update_cron war

You can validate your task schedule and get a human-readable description of it by using appcfg.py cron_info app-dir (Python) or appcfg cron_info war (Java). The report includes the exact days and times of the next few runs, so you can make sure that the schedule is what you want.

These are the possible fields for each scheduled task:

description

A textual description of the scheduled task, displayed in the Administration Console.

url

The URL path of the request handler to call for this task.

schedule

The schedule on which to execute this task.

timezone

The time zone for the schedule, as a standard “zoneinfo” time zone descriptor (such as America/Los_Angeles). If omitted, the schedule times are interpreted as UTC time.

target

The ID of the app version to use for the task. If omitted, App Engine calls the version that is the default at the time the task executes.

TIP

If you choose a time zone identifier where Daylight Saving Time (DST) is used and have a task scheduled during the DST hour, your task will be skipped when DST advances forward an hour, and run twice when DST retreats back an hour. Unless this is desired, pick a time zone that does not use DST, or do not schedule tasks during the DST hour. (The default time zone UTC does not use DST.)

Specifying Schedules

The value for the schedule element (in either Python or Java) uses a simplified English-like format for describing the recurrence of the task. It accepts simple recurrences, such as:

every 30 minutes

every 3 hours

The minimum interval is every 1 minutes. The parser’s English isn’t that good: it doesn’t understand every 1 minute or every minute. It does understand every day, as an exception.

The interval every day accepts an optional time of day, as a 24-hour hh:mm time. This runs every day at 11:59 p.m.:

every day 23:59

You can have a task recur weekly using the name of a weekday, as in every tuesday, and can also include a time: every tuesday 23:59. In another English parsing foible, day names must use all lowercase letters. You can abbreviate day names using just the first three letters, such asevery tue 23:59.

You can have a task recur monthly or on several days of a given month by specifying a comma-delimited list of ordinals (such as 2nd, or first,third) and a comma-delimited list of weekday names (monday,wednesday,friday or sat,sun). You can also include a time of day, as earlier. This occurs on the second and fourth Sunday of each month:

2nd,4th sunday

You can have a task recur yearly by including the word “of” and a comma-delimited list of lowercase month names (january,july, or oct,nov,dec). This schedule executes at 6 p.m. on six specific days of the year:

3rd,4th tue,wed,thu of march 18:00

You can specify recurrences to occur between two times of the day. This executes the task every 15 minutes between 3 a.m. and 5 a.m. every day:

every 15 mins from 03:00 to 05:00

By default, when a schedule uses a time interval without an explicit start time, App Engine will wait for the previous task to complete before restarting the timer. If a task runs every 15 minutes and the task takes 5 minutes to complete, each task’s start time begins 20 minutes apart. If you’d prefer the next task to start at a specific interval from the previous start time regardless of the time taken to complete the previous task (or whether the previous task has finished), use the synchronized keyword:

every 15 mins synchronized