Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating a processing queue in Tornado

Tags:

python

tornado

I'm using a Tornado web server to queue up items that need to be processed outside of the request/response cycle.

In my simplified example below, every time a request comes in, I add a new string to a list called queued_items. I want to create something that will watch that list and process the items as they show up in it.

(In my real code, the items are processed and sent over a TCP socket which may or may not be connected when the web request arrives. I want the web server to keep queuing up items regardless of the socket connection)

I'm trying to keep this code simple and not use external queues/programs like Redis or Beanstalk. It's not going to have very high volume.

What's a good way using Tornado idioms to watch the client.queued_items list for new items and process them as they arrive?

import time

import tornado.ioloop
import tornado.gen
import tornado.web

class Client():

    def __init__(self):
        self.queued_items = []

    @tornado.gen.coroutine
    def watch_queue(self):
        # I have no idea what I'm doing
        items = yield client.queued_items
        # go_do_some_thing_with_items(items)

class IndexHandler(tornado.web.RequestHandler):

    def get(self):
        client.queued_items.append("%f" % time.time())
        self.write("Queued a new item")

if __name__ == "__main__":

    client = Client()

    # Watch the queue for when new items show up
    client.watch_queue()

    # Create the web server 
    application = tornado.web.Application([
        (r'/', IndexHandler),
    ], debug=True)

    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()
like image 716
Scott Avatar asked Mar 30 '15 18:03

Scott


1 Answers

There is a library called toro, which provides synchronization primitives for tornado. [Update: As of tornado 4.2, toro has been merged into tornado.]

Sounds like you could just use a toro.Queue (or tornado.queues.Queue in tornado 4.2+) to handle this:

import time

import toro
import tornado.ioloop
import tornado.gen
import tornado.web

class Client():

    def __init__(self):
        self.queued_items = toro.Queue()

    @tornado.gen.coroutine
    def watch_queue(self):
        while True:
            items = yield self.queued_items.get()
            # go_do_something_with_items(items)

class IndexHandler(tornado.web.RequestHandler):

    @tornado.gen.coroutine
    def get(self):
        yield client.queued_items.put("%f" % time.time())
        self.write("Queued a new item")

if __name__ == "__main__":

    client = Client()

    # Watch the queue for when new items show up
    tornado.ioloop.IOLoop.current().add_callback(client.watch_queue)

    # Create the web server 
    application = tornado.web.Application([
        (r'/', IndexHandler),
    ], debug=True)

    application.listen(8888)
    tornado.ioloop.IOLoop.current().start()

There are a few tweaks required, aside from switching the data structure from a list to a toro.Queue:

  1. We need to schedule watch_queue to run inside the IOLoop using add_callback, rather than trying to call it directly outside of an IOLoop context.
  2. IndexHandler.get needs to be converted to a coroutine, because toro.Queue.put is a coroutine.

I also added a while True loop to watch_queue, so that it will run forever, rather than just processing one item and then exiting.

like image 187
dano Avatar answered Oct 21 '22 23:10

dano