I want to use a group (or chunks) inside a chain, like:
chain(getRange.s(3), GROUP() , xsum.s() )
Where GROUP()
is a group of double()
tasks, i.e. group(double(0),double(1),double(2))
.
A similar question was posted in How to chain a Celery task that returns a list into a group? but it's not explained how to pass the output from group to the next task in the chain.
@task
def getRange(x):
return range(x)
@task
def double(nr):
return nr*2
@task
def xsum(list):
return sum(list)
I don't believe there is a way to do that with the current primitives in a single chain. Passing callbacks like in the question you mention won't allow you to listen to when the group tasks have finished. The closest you can get is something like:
@task
def get_range(x):
return range(x)
@task
def mapper(nr):
return nr * 2
@task
def reducer(nrs):
return sum(nrs)
@task
def double_then_sum(nrs):
return (
group([mapper.s(nr) for nr in nrs]) |
reducer.s()
)()
ar = (get_range.s(3) | double_then_sum.s())() # call the procedure
ar.result.result # get the result
Otherwise you could try using dynamic chaining, which would lead to a simpler solution, or just use map
if you don't need your grouped tasks to run in parallel.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With