Diving into gevent

Echo server

from gevent.server import StreamServer


def connection_handler(socket, address):
    for l in socket.makefile('r'):
        socket.sendall(l)


if __name__ == '__main__':
    server = StreamServer(('0.0.0.0', 8000), connection_handler)
    server.serve_forever()

Bulk File Downloader

from gevent import monkey
monkey.patch_all()

import urllib2
from gevent.pool import Pool


def download(url):
    return urllib2.urlopen(url).read()


if __name__ == '__main__':
    urls = ['http://httpbin.org/get'] * 100
    pool = Pool(20)
    print pool.map(download, urls)

Chat Server

users = {}  # mapping of username -> Queue

def broadcast(msg):
    msg += '\n'
    for v in users.values():
        v.put(msg)

def writer(queue, sock):
    while True:
        msg = queue.get()
        sock.sendall(msg)

def reader(username, f):
    for l in f:
        msg = '%s> %s' % (username, l.strip())
        broadcast(msg)

Chat Server

def handle(sock, client_addr):
    f = sock.makefile()

    name = read_name(f, sock)
    broadcast('## %s joined from %s.' % (name, client_addr[0]))

    users[name] = q = gevent.queue.Queue()

    try:
        r = gevent.spawn(reader, name, f)
        w = gevent.spawn(writer, q, sock)
        gevent.joinall([r, w])
    finally:
        del users[name]
        broadcast('## %s left the chat.' % name)

Async in Python

Synchronous I/O

Thread and processes, block and let the kernel resume us.

Synchronous I/O

Drawbacks of threads:

Drawbacks of processes:

  • No shared memory space.
  • High memory use.

Asyncronous I/O

  • "Suspend" processing
  • Central place waits for I/O
  • "Resume" processing

In pseudocode

read_waiters = {}
write_waiters = {}

def event_loop():
    while True:
        read, write, error = select.select(
            read_waiters.keys(),
            write_waiters.keys(),
            ...
        )

        for fd in read:
            resume(read_waiters.pop(fd))

        for fd in write:
            resume(write_waiters.pop(fd))

        # something about errors

With timeouts

timeout_waiters = []

def wait_for_timeout(delay, waiter):
    when = time.time() + delay
    heapq.heappush(timeout_waiters, (when, waiter))

def event_loop():
    while True:
        now = time.time()
        read, write, error = select.select(
            rfds, wfds, efds,
            timeout_waiters[0][0] - time.time()
        )
        while timeout_waiters:
            if timeout_waiters[0][0] <= now:
                 _, waiter = heapq.heappop(timeout_waiters)
                 resume(waiter)

Callbacks

Callbacks

def start_beer_request():
    http.get('/api/beer', handle_response)

def handle_response(resp):
    beer = load_beer(resp.json)
    do_something(beer)
def get_fruit(beer_id, callback):
    def handle_response(resp):
        beer = load_beer(resp.json)
        callback(beer)

    http.get('/api/beer/%d' % beer_id, handle_response)

Callbacks in Pika (AMQP client)

def on_connected(connection):
    connection.channel(on_channel_open)

def on_channel_open(new_channel):
    global channel
    channel = new_channel
    channel.queue_declare(
        queue="test", durable=True, exclusive=False,
        auto_delete=False, callback=on_queue_declared)

def on_queue_declared(frame):
    channel.basic_consume(handle_delivery, queue='test')

def handle_delivery(channel, method, header, body):
    print body

parameters = pika.ConnectionParameters()
connection = pika.SelectConnection(parameters, on_connected)
connection.ioloop.start()

Pitfalls of callbacks

"Callbacks are the new Goto"

  • Untidy code structure

    • Must split all code into multiple parts
    • Functions return values aren't useful
  • Extra effort to do error handling

Method-based callbacks in Twisted

class BattleshipsProcessProtocol(ProcessProtocol):
    def __init__(self, name):
        self.queue = DeferredQueue()

    def errReceived(self, data):
        ...

    def outReceived(self, data):
        self.buf += data
        lines = self.buf.split('\n')
        self.buf = lines[-1]
        for l in lines[:-1]:
            self.queue.put(l)

    def close(self):
        self.transport.signalProcess('TERM')

Method-based callbacks in asyncio

import asyncio

class EchoClient(asyncio.Protocol):
    message = 'This is the message. It will be echoed.'

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('data sent: {}'.format(self.message))

    def data_received(self, data):
        print('data received: {}'.format(data.decode()))

    def connection_lost(self, exc):
        print('server closed the connection')
        asyncio.get_event_loop().stop()

Generator-based Coroutine

Generator-based Coroutine

import asyncio

@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)
    return x + y

@asyncio.coroutine
def print_sum(x, y):
    result = yield from compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

Generators in Tornado

class GenAsyncHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch("http://example.com")
        do_something_with_response(response)
        self.render("template.html")

Generators vs coroutine

  • Generators provide coroutine-line functionality
  • But can only yield to their caller
  • Full coroutines can yield to any other coroutine (stack)
  • Using generators to implement coroutines requires all stack frames to collaborate
  • This requires code changes (eg. yield from)

Greenlets/green threads

Greenlets/green threads

import gevent

def compute(x, y):
    print "Compute %s + %s ..." % (x, y)
    gevent.sleep(1.0)
    return x + y

def print_sum(x, y):
    result = compute(x, y)
    print "%s + %s = %s" % (x, y, result)

print_sum(1, 2)

Gevent hub

  • gevent.sleep() doesn't really block
  • It nominates the hub to run
  • The hub can do other things
  • Only if nothing else to do, block (eg. on select)

Gevent: greenlets plus monkey-patching

# These two lines have to be the first thing in your program, before
# anything else is imported
from gevent.monkey import patch_all
patch_all()

import time

def compute(x, y):
    print "Compute %s + %s ..." % (x, y)
    time.sleep(1.0)
    return x + y

def print_sum(x, y):
    result = compute(x, y)
    print "%s + %s = %s" % (x, y, result)

print_sum(1, 2)

Isn't monkey-patching bad?

  • Think of it as a distribution of Python
  • Always patch before any other import
  • It is optional
    • You can't rely on it if writing libraries
    • You can enforce it in frameworks

Significant advantages:

  • Works with any existing synchronous, pure Python code.
  • Generally works with async code too (only select() is emulated).

gevent concurrency primitives

Spawning and killing greenlets

  • gevent.spawn(function, *args, **kwargs) - spawn a greenlet
  • gevent.kill(greenlet, exception=GreenletExit) - 'kill' a greenlet by raising an exception in that greenlet.
  • gevent.pool - greenlet-based equivalent to multiprocessing.pool.

Synchronisation primitives

  • gevent.lock.Sempahore
  • gevent.lock.RLock
  • gevent.event.Event

Message Passing

  • gevent.queue.Queue
  • gevent.event.AsyncResult - block waiting for a single result. Also allows an exception to be raised instead.

Timeouts

from gevent import Timeout

with Timeout(5):
    data = sock.recv()

Higher level server kit

  • gevent.server.StreamServer - TCP server. Also supports SSL.
  • gevent.server.DatagramServer - UDP server.
  • gevent.pywsgi.WSGIServer - WSGI server that supports streaming, keepalives, SSL etc.

Why do we want a synchronous programming model?

No code changes required to business logic:

  • Async code can call business logic
  • Business logic can call async code
from gevent.socket import create_connection

def process_orders(lines):
    """Do something important with an iterable of lines."""
    for l in lines:
        ...

socket = create_connection((host, port))
f = socket.makefile(mode='r')
line_reader(f)

Advantages vs blocking I/O

  • Low memory usage
  • Shared memory
  • Fewer mutexes and locking primitives required.
  • A greenlet can kill another greenlet.

Advantages vs other async approaches

  • Much easier for developers to understand.
  • No code changes.
  • Exceptions are raised in the right places, as soon as errors occur.
  • (Works on Windows).

Disdvantages

  • Python 3 branch isn't finished.
  • There exists the possibility of deadlock between greenlets.

Pitfalls of asynchronous I/O

  • Blocking properly anywhere in your program halts everything.
    • Typically C libraries
    • Protected from this for pure Python libraries, unlike other frameworks
  • Keeping the CPU busy prevents other greenlets getting service.
    • Make sure CPU-bound activity is done in other processes (eg. queue-mediated)

Experiences with gevent

Gevent I/O patterns

One greenlet per direction of I/O:

def reader(file):
    """Reader greenlet: read and re-broadcast lines of file."""
    for msg in file:  # blocks on I/O
        for q in queues:
            q.put(msg)

def writer(queue, sock):
    """Writer greenlet: feed messages from our queue to sock."""
    while True:
        msg = queue.get()  # blocks waiting for message
        sock.sendall(msg)  # blocks on I/O

n to m concurrency

  • Run n greenlets on m physical threads.
  • Works in Python with processes
    • Great performance on multiprocessor systems
    • Additional resilience
  • Used in Java, Go, Rust etc

Experiences with gevent

<Thank You!>