Celery include a module that is able to make asynchronous HTTP requests using amqp or some other celery backend. I am using tornado-celery producer for asynchronous message publishing. As I understood tornado-celery uses pika for this. The question is how to adapt celery.task.http.URL for tornado (make it non-blocking). There are basically two places, which have to be refined:
HttpDispatch.make_request()
have to be implemented using tornado async http client;URL.get_async(**kw)
or URL.post_async(**kw)
must be reimplemented with corresponding non-blocking code using tornado API. For instance:
class NonBlockingURL(celery.task.http.URL):
@gen.coroutine
def post_async(self, **kwargs):
async_res = yield gen.Task(self.dispatcher.delay,
str(self), 'POST', **kwargs)
raise gen.Return(async_res)
But I could not understand how to do it in proper and concise way. How to make it fully as non-blocking as asynchronous ? By the way, I am using amqp backend.
Please, provide me nice guideline or even better, an example.
In fact, you have to decide if you use the async method of Tornado or if you use a queue like cellery. There is not point of using both, because the queue answers rapidly about the status of the queue, so there is no point of tornado doing something else while waiting for the queue to respond. To decide between the two solution, i would say:
Celery: more modulary, easy to distribute to different core or different machines, the task can be use by others than tornado, you have to install and keep running softare (amqp,cellery workers...)
Async in Tornado:more monolithic, one program do everything, shorter code, one program to run
To use the async method of Tornado, refer to the documentation. Here is a short solution using celery and tornado together:
from celery import Celery,current_task
import time
celery=Celery('tasks',backend='amqp',result_backend='amqp')
@celery.task
def MyTask(url,resid):
for i in range(10):
time.sleep(1)
current_task.update_state(state='running',meta={'i': i})
return 'done'
import tasks
from tornado import RequestHandler,....
from tornado.web import Application
dictasks={}
class runtask(RequestHandler):
def post(self):
i=len(dictasks)
dictasks[i]=task.MyTask.delay()
self.write(i)
class chktask(RequestHandler):
def get(self,i):
i=int(i)
if dictasks[i].ready():
self.write(dictasks[i].result)
del dictasks[i]
else:
self.write(dictasks[i].state + ' i:' + dictasks[i].info.get('i',-1))
Application = Application([
(r"/runtask", runtask}),
(r"/chktask/([0-9]+)", chktask),
etc.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With