gevent: asynchronous I/O made easy - Europython 2014

gevent is a framework for scalable asynchronous IO with a fully synchronous programming model.

Let's look at some examples of where we are going:

  • Echo server

What is the strange monkey.patch_all() call? Not to worry, this isn't like your every day monkey patching. This is just a distribution of Python that happens to be shipped as a set of monkey patches.

  • Chat server
  • Bulk file downloader
  • Ping each other game

Simple enough? Let's look at why this gevent stuff is written like this.

Synchronous IO

Thread and processes. Let the OS schedule each thread/process.

Each thread or process is allowed to individually block waiting on I/O. Because we have full concurrency, this blocking doesn't affect other processes, and

Drawbacks of threads: terrible performance. See Dave Beazley's notes on the GIL. Also high memory use. Threads in Linux allocate stack memory (see ulimit -s). This is not useful for Python - it will just cause you to run out of memory with relatively few threads.

Drawbacks of processes: No shared memory space. High memory use, both because of the stack allocation and copy-on-write. Threads in Linux are like a special kind of process; the kernel structures are more or less the same.

Asyncronous IO

All asynchronous IO falls back to the same pattern. It's not really about how the code executes but where the waiting is done. Multiple I/O activities need to unify their efforts to wait so that the waiting happens in a single place in the code. When an event occurs, the asynchronous system needs to resume the section of the code that was waiting for that event.

The problem then, is not how to do the waiting in a single place, but how to resume the one piece of code expecting to receive that event.

There are several different approaches to how to organise a single-threaded program so that all of the waiting can be done in a single place in code, ie. there are several different ideas about what resume() and waiter might be in the following event loop code:

read_waiters = {}
write_waiters = {}
timeout_waiters = []

def wait_for_read(fd, waiter):
    read_waiters[fd] = waiter

wait_for_write = write_waiters.___setitem__

def event_loop():
    while True:
        readfds = read_waiters.keys()
        writefds = write_waiters.keys()

        read, write, error = select.select(
            readfds,  # waiting for read
            writefds,  # waiting for write
            readfds + writefds,  # waiting for errors
        )

        for fd in read:
            resume(read_waiters.pop(fd))

        for fd in write:
            resume(write_waiters.pop(fd))

        # something about errors

We may want to add timeouts to the above code, in which case we might write something more like the following:

read_waiters = {}
write_waiters = {}
timeout_waiters = []

def wait_for_read(fd, waiter):
    read_waiters[fd] = waiter

wait_for_write = write_waiters.___setitem__

def wait_for_timeout(delay, waiter):
    when = time.time() + delay
    heapq.heappush(timeout_waiters, (when, waiter))

def event_loop():
    while True:
        if timeout_waiters:
            timeout = timeout_waiters[0][0] - time.time()
        else:
            timeout = 0

        readfds = read_waiters.keys()
        writefds = write_waiters.keys()

        read, write, error = select.select(
            readfds,  # waiting for read
            writefds,  # waiting for write
            readfds + writefds,  # waiting for errors
            timeout   # if nothing else, resume here
        )

        for fd in read:
            resume(read_waiters.pop(fd))

        for fd in write:
            resume(write_waiters.pop(fd))

        now = time.time()
        while timeout_waiters:
            if timeout_waiters[0][0] <= now:
                 _, waiter = heapq.heappop(timeout_waiters)
                 resume(waiter)

        # something about errors

All asynchronous IO frameworks are built on the same kind of model; they just take different approaches to how to structure your code so that it can be suspended when an IO operation is requested and resumed when that operation is completed.

Callbacks

Examples:

  • Javascript/Node
  • Tornado IOStream
  • Twisted Deferred
  • asyncio, under the hood

One approach is to just call a callable whenever data is available to read. Typically we want to act on a higher level than chunks of data, so have a callback read and parse individual chunks of binary data and call an application callback when the parsed content is complete (such as a HTTP request or response).

What the user code looks like:

def start_beer_request():
    http.get('/api/beer', handle_response)

def handle_response(resp):
    beer = load_beer(resp.json)
    do_something(beer)

How can we tie the response back to a specific request? One answer is closures:

def get_fruit(beer_id, callback):
    def handle_response(resp):
        beer = load_beer(resp.json)
        callback(beer)

    http.get('/api/beer/%d' % beer_id, handle_response)

Either way is ugly, especially if we wanted to chain more IO calls (anyone fancy this nesting any deeper?). There's no escape from the nesting and programming in pieces. As it has been said, "Callbacks are the new Goto".

Diagram about how this way requires call stacks to be broken into pieces

Method-based callbacks

Examples:

  • Twisted Protocols
  • Tornado RequestHandler
  • asyncio Transports/Protocols

Callbacks are a pain! So build interfaces where the methods are automatically registered as callbacks, subclass to implement them. For example, this is asyncio example code:

import asyncio

class EchoClient(asyncio.Protocol):
    message = 'This is the message. It will be echoed.'

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('data sent: {}'.format(self.message))

    def data_received(self, data):
        print('data received: {}'.format(data.decode()))

    def connection_lost(self, exc):
        print('server closed the connection')
        asyncio.get_event_loop().stop()

This sometimes ends up as a neat solution in simple cases, but you still get the problem of logic being fragmented over multiple callbacks.

This solution doesn't completely solve the problem of eliminating callbacks, it just gives you a neater framework that avoids callbacks being dotted all over the code. It places constraints on what callbacks can be called and where they can be defined. Suppose you want to link two protocols together - for example, you need to make an HTTP request to a backend REST service in the middle of handling a request from a user.

Error handling in callbacks

You have to add error handlers or your internal state may get out of sync, deadlock waiting for an event that will never arrive.

Sadly not all frameworks enforce this. One reason is that it makes every single program twice as hard to read if you do enforce it. So it's optional, and consequently programmers don't always (even often) do it.

Generator-based Coroutine

Examples:

  • tornado.gen
  • asyncio/Tulip

Generators have been used to implement coroutine-like functionality. This allows us to defer to some event loop system which will resume the after-the-callback section of our code after the IO operation has completed:

import asyncio

@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)
    return x + y

@asyncio.coroutine
def print_sum(x, y):
    result = yield from compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

(In this example we should note that we will want to spawn other async activities to get any scalability benefit out of this approach).

When generators were introduced with PEP255 they were described as providing coroutine-like functionality. PEP342 and PEP380 extended this with the ability to send exceptions to a generator and to yield from a subgenerator respectively.

The term "coroutine" denotes a system where you have more than one routine - effectively, call stack - active at a time. Because each call stack is preserved, a routine can suspend its state and yield to a different, named coroutine. In some languages, there is a yield keyword of some form that invokes this behaviour.

Why would you want to do this? This provides a primitive form of multitasking -- cooperative multitasking. Unlike threads, they are not preemptive, which means that they aren't interrupted until they explicitly yield.

Generators are a subset of this behaviour, right down to the 'yield' terminology. Wikipedia calls them semicoroutines. However there are two significant differences between generators and coroutines:

  1. Generators can only yield to the calling frame.
  2. Every frame in the stack needs to collaborate in yielding to the calling frame - so the top frame might yield, and all other calls in the stack need to be made with yield from.
https://docs.python.org/3/_images/tulip_coro.png

Diagram about generator-based coroutines require every frame in the call stacks to collaborate in suspending a stack.

NB: This diagram is literally in the asyncio docs, see https://docs.python.org/3/library/asyncio-task.html

Greenlets/green threads

A greenlet is a full coroutine.

Examples:

  • gevent
  • greenlet
  • Stackless Python

Let's rewrite that asyncio example with gevent:

import gevent

def compute(x, y):
    print "Compute %s + %s ..." % (x, y)
    gevent.sleep(1.0)
    return x + y

def print_sum(x, y):
    result = compute(x, y)
    print "%s + %s = %s" % (x, y, result)

print_sum(1, 2)

(Again, we'll want to start other greenlets to get any scalability benefit out of this approach).

That's much simpler! We can simply omit all the generator cruft because gevent.sleep() can yield to the event loop (called the hub in gevent) without the involvement of the calling frames. Also, because we have first class co-routines, the hub can be instantiated on demand; it doesn't need to be created as the explicit parent of the stack.

Gevent: greenlets plus monkey-patching

Suppose though the sleep wasn't in our code? We can use gevent's monkey-patching to ensure that no code changes are necessary:

# These two lines have to be the first thing in your program, before
# anything else is imported
from gevent.monkey import patch_all
patch_all()

import time

def compute(x, y):
    print "Compute %s + %s ..." % (x, y)
    time.sleep(1.0)
    return x + y

def print_sum(x, y):
    result = compute(x, y)
    print "%s + %s = %s" % (x, y, result)

print_sum(1, 2)

The bulk of blocking calls in the standard library are patched so that they yield to the hub rather than blocking. Likewise the threading system is patched so that greenlets are spawned instead of threads.

Isn't monkey-patching bad?

In this case it's better to think of gevent as a distribution of Python that happens to use green threads, and includes different implementations of standard library code.

This is part of the reason it has to come right at the top of the module that contains the entry point of the program: it's a statement that before anything else happens, we want to use the gevent's distribution of the stdlib.

While the monkey patching is elegant in that it allows pure Python applications and libraries to become asynchronous with no modifications, it is an optional component of gevent, and you can write asynchronous programs without it.

  • Works with any existing synchronous, pure Python code.
  • Generally works with async code too - only select() is emulated, but most async frameworks have a select()-based implementation. No reason epoll() etc couldn't be implemented.

Examples of gevent threading primitives

Because gevent is based on lightweight "threads" the gevent library contains a wealth of concurrency tools that help you spawn greenlets, implement critical sections (locks/mutexes), and pass messages between greenlets.

Because it's a networking library it also has some higher-level network server modules, such as a TCP server and WSGI server.

Spawning and killing greenlets

  • gevent.spawn(function, *args, **kwargs) - spawn a greenlet
  • gevent.kill(greenlet, exception=GreenletExit) - 'kill' a greenlet by raising an exception in that greenlet.

There are also higher-level primitives like gevent.pool, a greenlet-based equivalent to multiprocessing.pool.

Synchronisation primitives

  • gevent.lock.Sempahore
  • gevent.lock.RLock
  • gevent.event.Event

Message Passing

  • gevent.queue.Queue
  • gevent.event.AsyncResult - block waiting for a single result. Also allows an exception to be raised instead.

Timeouts

There's a useful wrapper to kill a greenlet if it doesn't succeed within a certain period of time. This can be used to wrap timeouts around pretty much any sequence of operations:

from gevent import Timeout

with Timeout(5):
    data = sock.recv()

Higher level server kit

  • gevent.server.StreamServer - TCP server. Also supports SSL.
  • gevent.server.DatagramServer - UDP server.
  • gevent.pywsgi.WSGIServer - WSGI server that supports streaming, keepalives, SSL etc.

Gevent IO patterns

The recommended way of using gevent (and avoiding select()) is to spawn one greenlet for each direction of communication - read or write. The code for each greenlet is a simple loop that "blocks" whenever necessary:

import gevent
from gevent.queue import Queue
from gevent.server import StreamServer

queues = set()


def reader(file):
    """Reader greenlet: read and re-broadcast lines of file."""
    for msg in file:  # blocks on IO
        for q in queues:
            q.put(msg)


def writer(queue, sock):
    """Writer greenlet: feed messages from our queue to sock."""
    while True:
        msg = queue.get()  # blocks waiting for message
        sock.sendall(msg)  # blocks on IO


def handle_connection(sock, client_addr):
    """Connection management greenlet."""
    queue = Queue()
    queues.add(queue)
    try:
        r = gevent.spawn(reader, sock.makefile(mode='r'))
        w = gevent.spawn(writer, queue, sock)
        gevent.joinall([r, w])
    finally:
        queues.remove(queue)


if __name__ == '__main__':
    s = StreamServer(('0.0.0.0', 8001), handle_connection)
    s.serve_forever()

Why do we want a syncronous programming model?

Of the methods described above, only gevent requires no changes to the calling conventions of other code. The importance of this should not be understated: it means that your business logic can happily call into blocking code.

As a very simple example, say we want to do a streaming API. Our pre-existing business logic expects an iterable. With gevent, we can stream this iterable from a remote server with no code changes!:

from gevent.socket import create_connection

def process_orders(lines):
    """Do something important with an iterable of lines."""
    for l in lines:
        ...


socket = create_connection((host, port))
f = socket.makefile(mode='r')
line_reader(f)

Another advantage is that exceptions can always be raised, in a sensible place. As PEP20 says,

Errors should never pass silently.
Unless explicitly silenced.

Other advantages of gevent

Unlike real threads, greenlets are never suspended at arbitrary times, so you can get away with many fewer locks and mutexes - you only need a mutex if there's a risk you might "block" between atomic operations.

Also unlike real threads, a greenlet can "kill" another greenlet - causing an exception to be raised in that greenlet when it next resumes.

  • Much easier for developers to understand.
  • Exceptions are raised in the right places, as soon as errors occur.

Disdvantages

Bad news: the Python 3 branch of gevent isn't finished. I have not investigated whether it is at all usable.

One solution is to allow different implementation languages in different parts of your stack. gevent is great at dealing with scalable IO, Python 3 is good at Unicode and user-facing applications. It may be possible to arrange to use the right tool for the job.

Pitfalls of asynchronous IO

Gevent shares several pitfalls with many asynchronous IO frameworks:

  • Blocking (real blocking, at the kernel level) somewhere in your program halts everything. This is most likely in C code where monkey patches don't take effect. You need to take careful steps to make your C library "green".
  • Keeping the CPU busy. greenlets are not preempted, so this will cause other greenlets never to be scheduled.
  • There exists the possibility of deadlock between greenlets.

preempted means to interrupt a running process when another process of similar priority needs to run, to avoid it hogging the CPU.

In summary gevent has very few pitfalls that are not present in other async IO frameworks.

One pitfall gevent sidesteps is that you are less likely to hit an async-unaware pure Python library that will block your app.

n to m concurrency

An approach for even better scalability is to run n greenlets on m physical threads. In Python we need to do this with processes. This gives very good performance on multiprocessor systems, as well as adding resilience.

This is the model that is used by Rust and Java among others.

Experiences with gevent

I evaluated gevent as well as the other systems mentioned in 2011. Gevent was the clear winner. There was little to choose from in terms of performance, but gevent's significantly simpler programming model was a major selling point. Not all developers are at the level where they are comfortable with generators, closures and callbacks, and gevent requires little of them.

The ability to use gevent with existing code, or business logic that is agnostic about IO, was very valuable too: you probably want to re-use certain business logic libraries in both high-performance network apps and offline batch processes.

Over the following 18 months we wrote a variety of network applications. One byproduct was a web service framework called nucleon, which aimed to connect RESTful JSON, PostgreSQL and AMQP, all with 'green' driver code for high scalability.

The AMQP library was originally a fork of Puka (not Pika), an AMQP library that didn't try to force its own brand of async on you (like Pika). I eventually completely rewrote this and split it into a separate project called nucleon.amqp. nucleon.amqp allows interaction with an AMQP server with a fully synchronous programming model - remote queues on the AMQP broker can be exposed locally with the Queue API.

As a team we adapted to gevent and found ourselves developing a language of diagrams to explain how the flow of control moves between greenlets, how they block and how they signal each other.

<Thank You!>