I'm using a Tornado web server to queue up items that need to be processed outside of the request/response cycle.
In my simplified example below, every time a request comes in, I add a new string to a list called queued_items
. I want to create something that will watch that list and process the items as they show up in it.
(In my real code, the items are processed and sent over a TCP socket which may or may not be connected when the web request arrives. I want the web server to keep queuing up items regardless of the socket connection)
I'm trying to keep this code simple and not use external queues/programs like Redis or Beanstalk. It's not going to have very high volume.
What's a good way using Tornado idioms to watch the client.queued_items
list for new items and process them as they arrive?
import time
import tornado.ioloop
import tornado.gen
import tornado.web
class Client():
def __init__(self):
self.queued_items = []
@tornado.gen.coroutine
def watch_queue(self):
# I have no idea what I'm doing
items = yield client.queued_items
# go_do_some_thing_with_items(items)
class IndexHandler(tornado.web.RequestHandler):
def get(self):
client.queued_items.append("%f" % time.time())
self.write("Queued a new item")
if __name__ == "__main__":
client = Client()
# Watch the queue for when new items show up
client.watch_queue()
# Create the web server
application = tornado.web.Application([
(r'/', IndexHandler),
], debug=True)
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
There is a library called toro
, which provides synchronization primitives for tornado
. [Update: As of tornado 4.2, toro
has been merged into tornado
.]
Sounds like you could just use a toro.Queue
(or tornado.queues.Queue
in tornado
4.2+) to handle this:
import time
import toro
import tornado.ioloop
import tornado.gen
import tornado.web
class Client():
def __init__(self):
self.queued_items = toro.Queue()
@tornado.gen.coroutine
def watch_queue(self):
while True:
items = yield self.queued_items.get()
# go_do_something_with_items(items)
class IndexHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
yield client.queued_items.put("%f" % time.time())
self.write("Queued a new item")
if __name__ == "__main__":
client = Client()
# Watch the queue for when new items show up
tornado.ioloop.IOLoop.current().add_callback(client.watch_queue)
# Create the web server
application = tornado.web.Application([
(r'/', IndexHandler),
], debug=True)
application.listen(8888)
tornado.ioloop.IOLoop.current().start()
There are a few tweaks required, aside from switching the data structure from a list to a toro.Queue
:
watch_queue
to run inside the IOLoop using add_callback
, rather than trying to call it directly outside of an IOLoop context.IndexHandler.get
needs to be converted to a coroutine, because toro.Queue.put
is a coroutine. I also added a while True
loop to watch_queue
, so that it will run forever, rather than just processing one item and then exiting.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With