I want to send messages to celery and when it reaches let's say 100 messages i want celery to execute them in batches. This is a common scenario if I want to commit in batches to a database.
For this purpose while googling around i found this link: for doing batches with celery: http://celery.readthedocs.org/en/latest/reference/celery.contrib.batches.html
My problem is that in the example there is no obvious way to get the data submitted to the task
for instance lets say that we submit one by one some messages with:
task.apply_async((message,), link_error=error_handler.s())
and then we have the following task implementation:
@celery.task(name="process.data", base=Batches, flush_every=100, flush_interval=1)
def process_messages(requests):
for request in requests:
print request /// how I can take the message data submitted in my task for process?
Is there any alternative way to achieve batches with celery? Thank you
For anyone that will find this post useful after many trial and errors I have managed to take the data out of the SimplRequest object in the following way:
When you submit your data with the following way:
func.delay(data)
from the request object you get the args attribute which is a list with the data:
request.args[0]
request.args[1]
etc.
If you submit your data with the following way:
func.apply_async((), {'data': data}, link_error=error_handler.s())
then data are available as a dictionary in kwargs:
request.kwargs['data']
Finally, as the example shows we need to do a loop into all requests to gather the data batch
for r in requests:
data = r.kwargs['data']
It would be nice for the examples in page of the documentation (here) to be updated with a more simple and clear example
The last version of batches.py
available at https://github.com/celery/celery/blob/3.1/celery/contrib/batches.py before being deprecated doesn't work with Celery 5+ / Python 3.
A working version can be found at https://gist.github.com/robin-vjc/1a4676ccb055162082c5a061ab556f58
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With