Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can Celery workflows include dynamically generated groups?

Tags:

python

celery

Consider this Celery workflow:

wf = collect_items.s() | add_details.s() | publish_items.s()

It collects some items, adds extra details to each one in parallel, then publishes the decorated information somewhere.

What I want is for add_details to behave as a group of tasks, one per item, that fetch the details for each item in parallel. Obviously the group has to be generated from the data output by collect_items.

Here's what I tried, using the default rabbitmq broker:

app = Celery(backend="rpc://")

@app.task
def collect_items(n):
    return range(n)

@app.task
def add_details(items):
    return group(get_details.s(i) for i in items).delay()

@app.task
def get_details(item):
    return (item, item * item)

@app.task
def publish_items(items):
    print("items = %r" % items)

I want the output to be numbers 0-9 decorated with their squares, all calculated concurrently:

>>> wf.delay(10).get()
items = [(0, 0), (1, 1), (2, 4), ... (8, 64), (9, 81)]

This does invoke the expected tasks, but unfortunately passes the results to publish_items as a bunch of GroupResults containing AsyncResults with PENDING status, even though the tasks appear to have completed.

I can't wait for those results in publish_items because you can't use get() in a task (risk of deadlocks etc.). I thought Celery would recognise when a task like add_details returns a GroupResult and do a get on it before returning that value to be passed on to the next task in the chain.

This seems like a common pattern, is there anyway to do it in Celery?

I've seen similar questions here but the answers seem to assume a lot of intimate knowledge of how Celery works under the covers, and they don't work for me anyway.

like image 848
quantoid Avatar asked Feb 28 '18 03:02

quantoid


1 Answers

Here is your example, working somewhat differently but in my opinion, achieving the expected result.

@app.task
def collect_items(n):
    logger.info("collect %r items", n)
    items = list(range(n))
    return items


@app.task
def schedule_task_group(items):
    logger.info("group get_details tasks & pass results to publish_items")
    return (
        group(get_details.s(i) for i in items) | publish_items.s()
    ).delay()


@app.task
def get_details(item):
    logger.info("get item detail for = %r", item)
    return (item, item * item)


@app.task
def publish_items(items):
    logger.info("publish items = %r", items)
    return items


print('schedule collect_items & pass result to schedule_task_group with n = 5')
(collect_items.s(5) | schedule_task_group.s()).delay()

The main difference from your code is the fact that I'm chaining the get_details group with the publish_items task, effectively making it a chord. This is mentioned in the docs and is needed since you want the whole group of tasks to be scheduled, and done running, before passing it to publish_items.

Please check it out @quantoid and let me know what you think. Note that running celery with the -l INFO flag will make it easier to visualize what actually happens in the workers.

Ref: - http://docs.celeryproject.org/en/latest/userguide/canvas.html - https://stackoverflow.com/a/15147171/484127

like image 89
tutuDajuju Avatar answered Nov 09 '22 22:11

tutuDajuju