Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adapting celery.task.http.URL for tornado

Celery include a module that is able to make asynchronous HTTP requests using amqp or some other celery backend. I am using tornado-celery producer for asynchronous message publishing. As I understood tornado-celery uses pika for this. The question is how to adapt celery.task.http.URL for tornado (make it non-blocking). There are basically two places, which have to be refined:

  1. HttpDispatch.make_request() have to be implemented using tornado async http client;
  2. URL.get_async(**kw) or URL.post_async(**kw) must be reimplemented with corresponding non-blocking code using tornado API. For instance:

    class NonBlockingURL(celery.task.http.URL):
    
        @gen.coroutine
        def post_async(self, **kwargs):
            async_res = yield gen.Task(self.dispatcher.delay, 
                                       str(self), 'POST', **kwargs)
            raise gen.Return(async_res)
    

But I could not understand how to do it in proper and concise way. How to make it fully as non-blocking as asynchronous ? By the way, I am using amqp backend.

Please, provide me nice guideline or even better, an example.

like image 737
Rustem K Avatar asked Sep 10 '13 05:09

Rustem K


1 Answers

In fact, you have to decide if you use the async method of Tornado or if you use a queue like cellery. There is not point of using both, because the queue answers rapidly about the status of the queue, so there is no point of tornado doing something else while waiting for the queue to respond. To decide between the two solution, i would say:

Celery: more modulary, easy to distribute to different core or different machines, the task can be use by others than tornado, you have to install and keep running softare (amqp,cellery workers...)

Async in Tornado:more monolithic, one program do everything, shorter code, one program to run

To use the async method of Tornado, refer to the documentation. Here is a short solution using celery and tornado together:

task.py

 from celery import Celery,current_task
 import time
 celery=Celery('tasks',backend='amqp',result_backend='amqp')

 @celery.task
 def MyTask(url,resid):
     for i in range(10):
         time.sleep(1)
         current_task.update_state(state='running',meta={'i': i})
     return 'done'

server.py

 import tasks
 from tornado import RequestHandler,....
 from tornado.web import Application
 dictasks={}
 class runtask(RequestHandler):
     def post(self):
         i=len(dictasks)
         dictasks[i]=task.MyTask.delay()
             self.write(i)

 class chktask(RequestHandler):
     def get(self,i):
         i=int(i)
         if dictasks[i].ready():
             self.write(dictasks[i].result)
             del dictasks[i]
         else:
             self.write(dictasks[i].state + ' i:' + dictasks[i].info.get('i',-1))


 Application = Application([
     (r"/runtask", runtask}),
     (r"/chktask/([0-9]+)", chktask),

 etc.
like image 169
JulienFr Avatar answered Sep 18 '22 12:09

JulienFr