Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery execute task with a batch of messages

I want to send messages to celery and when it reaches let's say 100 messages i want celery to execute them in batches. This is a common scenario if I want to commit in batches to a database.

For this purpose while googling around i found this link: for doing batches with celery: http://celery.readthedocs.org/en/latest/reference/celery.contrib.batches.html

My problem is that in the example there is no obvious way to get the data submitted to the task

for instance lets say that we submit one by one some messages with:

task.apply_async((message,), link_error=error_handler.s())

and then we have the following task implementation:

@celery.task(name="process.data", base=Batches, flush_every=100, flush_interval=1)
def process_messages(requests):
   for request in requests:
       print request /// how I can take the message data submitted in my task for process?

Is there any alternative way to achieve batches with celery? Thank you

like image 996
tbo Avatar asked Nov 28 '14 14:11

tbo


2 Answers

For anyone that will find this post useful after many trial and errors I have managed to take the data out of the SimplRequest object in the following way:

When you submit your data with the following way:

func.delay(data)

from the request object you get the args attribute which is a list with the data:

request.args[0]
request.args[1] 
etc.

If you submit your data with the following way:

func.apply_async((), {'data': data}, link_error=error_handler.s())

then data are available as a dictionary in kwargs:

request.kwargs['data']

Finally, as the example shows we need to do a loop into all requests to gather the data batch

for r in requests:
       data = r.kwargs['data']

It would be nice for the examples in page of the documentation (here) to be updated with a more simple and clear example

like image 116
tbo Avatar answered Sep 22 '22 23:09

tbo


The last version of batches.py available at https://github.com/celery/celery/blob/3.1/celery/contrib/batches.py before being deprecated doesn't work with Celery 5+ / Python 3.

A working version can be found at https://gist.github.com/robin-vjc/1a4676ccb055162082c5a061ab556f58

like image 25
RVC Avatar answered Sep 21 '22 23:09

RVC