Consider this Celery workflow:
wf = collect_items.s() | add_details.s() | publish_items.s()
It collects some items, adds extra details to each one in parallel, then publishes the decorated information somewhere.
What I want is for add_details
to behave as a group of tasks, one per item, that fetch the details for each item in parallel. Obviously the group has to be generated from the data output by collect_items
.
Here's what I tried, using the default rabbitmq broker:
app = Celery(backend="rpc://")
@app.task
def collect_items(n):
return range(n)
@app.task
def add_details(items):
return group(get_details.s(i) for i in items).delay()
@app.task
def get_details(item):
return (item, item * item)
@app.task
def publish_items(items):
print("items = %r" % items)
I want the output to be numbers 0-9 decorated with their squares, all calculated concurrently:
>>> wf.delay(10).get()
items = [(0, 0), (1, 1), (2, 4), ... (8, 64), (9, 81)]
This does invoke the expected tasks, but unfortunately passes the results to publish_items
as a bunch of GroupResults
containing AsyncResults
with PENDING status, even though the tasks appear to have completed.
I can't wait for those results in publish_items
because you can't use get()
in a task (risk of deadlocks etc.). I thought Celery would recognise when a task like add_details
returns a GroupResult and do a get
on it before returning that value to be passed on to the next task in the chain.
This seems like a common pattern, is there anyway to do it in Celery?
I've seen similar questions here but the answers seem to assume a lot of intimate knowledge of how Celery works under the covers, and they don't work for me anyway.
Here is your example, working somewhat differently but in my opinion, achieving the expected result.
@app.task
def collect_items(n):
logger.info("collect %r items", n)
items = list(range(n))
return items
@app.task
def schedule_task_group(items):
logger.info("group get_details tasks & pass results to publish_items")
return (
group(get_details.s(i) for i in items) | publish_items.s()
).delay()
@app.task
def get_details(item):
logger.info("get item detail for = %r", item)
return (item, item * item)
@app.task
def publish_items(items):
logger.info("publish items = %r", items)
return items
print('schedule collect_items & pass result to schedule_task_group with n = 5')
(collect_items.s(5) | schedule_task_group.s()).delay()
The main difference from your code is the fact that I'm chaining the get_details
group with the publish_items
task, effectively making it a chord. This is mentioned in the docs and is needed since you want the whole group of tasks to be scheduled, and done running, before passing it to publish_items
.
Please check it out @quantoid and let me know what you think.
Note that running celery with the -l INFO
flag will make it easier to visualize what actually happens in the workers.
Ref: - http://docs.celeryproject.org/en/latest/userguide/canvas.html - https://stackoverflow.com/a/15147171/484127
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With