Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery - group inside a chain

I want to use a group (or chunks) inside a chain, like:

chain(getRange.s(3),  GROUP() , xsum.s() )

Where GROUP() is a group of double() tasks, i.e. group(double(0),double(1),double(2)). A similar question was posted in How to chain a Celery task that returns a list into a group? but it's not explained how to pass the output from group to the next task in the chain.

@task
def getRange(x):
    return range(x)

@task
def double(nr):
    return nr*2

@task
def xsum(list):
    return sum(list)
like image 203
sergiuz Avatar asked Dec 18 '12 13:12

sergiuz


1 Answers

I don't believe there is a way to do that with the current primitives in a single chain. Passing callbacks like in the question you mention won't allow you to listen to when the group tasks have finished. The closest you can get is something like:

@task
def get_range(x):
  return range(x)

@task
def mapper(nr):
  return nr * 2

@task
def reducer(nrs):
  return sum(nrs)

@task
def double_then_sum(nrs):
  return (
    group([mapper.s(nr) for nr in nrs]) |
    reducer.s()
  )()

ar = (get_range.s(3) | double_then_sum.s())() # call the procedure
ar.result.result # get the result

Otherwise you could try using dynamic chaining, which would lead to a simpler solution, or just use map if you don't need your grouped tasks to run in parallel.

like image 111
mtth Avatar answered Nov 05 '22 12:11

mtth