from gevent.server import StreamServer
def connection_handler(socket, address):
for l in socket.makefile('r'):
socket.sendall(l)
if __name__ == '__main__':
server = StreamServer(('0.0.0.0', 8000), connection_handler)
server.serve_forever()
from gevent import monkey
monkey.patch_all()
import urllib2
from gevent.pool import Pool
def download(url):
return urllib2.urlopen(url).read()
if __name__ == '__main__':
urls = ['http://httpbin.org/get'] * 100
pool = Pool(20)
print pool.map(download, urls)
users = {} # mapping of username -> Queue
def broadcast(msg):
msg += '\n'
for v in users.values():
v.put(msg)
def writer(queue, sock):
while True:
msg = queue.get()
sock.sendall(msg)
def reader(username, f):
for l in f:
msg = '%s> %s' % (username, l.strip())
broadcast(msg)
def handle(sock, client_addr):
f = sock.makefile()
name = read_name(f, sock)
broadcast('## %s joined from %s.' % (name, client_addr[0]))
users[name] = q = gevent.queue.Queue()
try:
r = gevent.spawn(reader, name, f)
w = gevent.spawn(writer, q, sock)
gevent.joinall([r, w])
finally:
del users[name]
broadcast('## %s left the chat.' % name)
Thread and processes, block and let the kernel resume us.
Drawbacks of threads:
Drawbacks of processes:
read_waiters = {}
write_waiters = {}
def event_loop():
while True:
read, write, error = select.select(
read_waiters.keys(),
write_waiters.keys(),
...
)
for fd in read:
resume(read_waiters.pop(fd))
for fd in write:
resume(write_waiters.pop(fd))
# something about errors
timeout_waiters = []
def wait_for_timeout(delay, waiter):
when = time.time() + delay
heapq.heappush(timeout_waiters, (when, waiter))
def event_loop():
while True:
now = time.time()
read, write, error = select.select(
rfds, wfds, efds,
timeout_waiters[0][0] - time.time()
)
while timeout_waiters:
if timeout_waiters[0][0] <= now:
_, waiter = heapq.heappop(timeout_waiters)
resume(waiter)
def start_beer_request():
http.get('/api/beer', handle_response)
def handle_response(resp):
beer = load_beer(resp.json)
do_something(beer)
def get_fruit(beer_id, callback):
def handle_response(resp):
beer = load_beer(resp.json)
callback(beer)
http.get('/api/beer/%d' % beer_id, handle_response)
def on_connected(connection):
connection.channel(on_channel_open)
def on_channel_open(new_channel):
global channel
channel = new_channel
channel.queue_declare(
queue="test", durable=True, exclusive=False,
auto_delete=False, callback=on_queue_declared)
def on_queue_declared(frame):
channel.basic_consume(handle_delivery, queue='test')
def handle_delivery(channel, method, header, body):
print body
parameters = pika.ConnectionParameters()
connection = pika.SelectConnection(parameters, on_connected)
connection.ioloop.start()
"Callbacks are the new Goto"
Untidy code structure
- Must split all code into multiple parts
- Functions return values aren't useful
Extra effort to do error handling
class BattleshipsProcessProtocol(ProcessProtocol):
def __init__(self, name):
self.queue = DeferredQueue()
def errReceived(self, data):
...
def outReceived(self, data):
self.buf += data
lines = self.buf.split('\n')
self.buf = lines[-1]
for l in lines[:-1]:
self.queue.put(l)
def close(self):
self.transport.signalProcess('TERM')
import asyncio
class EchoClient(asyncio.Protocol):
message = 'This is the message. It will be echoed.'
def connection_made(self, transport):
transport.write(self.message.encode())
print('data sent: {}'.format(self.message))
def data_received(self, data):
print('data received: {}'.format(data.decode()))
def connection_lost(self, exc):
print('server closed the connection')
asyncio.get_event_loop().stop()
import asyncio
@asyncio.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y
@asyncio.coroutine
def print_sum(x, y):
result = yield from compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
class GenAsyncHandler(RequestHandler):
@gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch("http://example.com")
do_something_with_response(response)
self.render("template.html")
import gevent
def compute(x, y):
print "Compute %s + %s ..." % (x, y)
gevent.sleep(1.0)
return x + y
def print_sum(x, y):
result = compute(x, y)
print "%s + %s = %s" % (x, y, result)
print_sum(1, 2)
# These two lines have to be the first thing in your program, before
# anything else is imported
from gevent.monkey import patch_all
patch_all()
import time
def compute(x, y):
print "Compute %s + %s ..." % (x, y)
time.sleep(1.0)
return x + y
def print_sum(x, y):
result = compute(x, y)
print "%s + %s = %s" % (x, y, result)
print_sum(1, 2)
Significant advantages:
from gevent import Timeout
with Timeout(5):
data = sock.recv()
No code changes required to business logic:
from gevent.socket import create_connection
def process_orders(lines):
"""Do something important with an iterable of lines."""
for l in lines:
...
socket = create_connection((host, port))
f = socket.makefile(mode='r')
line_reader(f)
One greenlet per direction of I/O:
def reader(file):
"""Reader greenlet: read and re-broadcast lines of file."""
for msg in file: # blocks on I/O
for q in queues:
q.put(msg)
def writer(queue, sock):
"""Writer greenlet: feed messages from our queue to sock."""
while True:
msg = queue.get() # blocks waiting for message
sock.sendall(msg) # blocks on I/O
nucleon - http://nucleon.readthedocs.org/en/latest/
nucleon.amqp - http://pythonhosted.org/nucleon.amqp/