Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery: access all previous results in a chain

So basically I have quite a complex workflow, which looks similar to this:

>>> res = (add.si(2, 2) | add.s(4) | add.s(8))()
>>> res.get()
16

Afterwards it's rather trivial for me to walk up the result chain and collect all individual results:

>>> res.parent.get()
8

>>> res.parent.parent.get()
4

My problem is, what if my third task depends on knowing the result of the first one, but in this example only receives the result of the second?

Also the chains are quite long and the results aren't that small, so just passing through the input as a result would unnecessarily pollute the result-store. Which is Redis, so limitations when using RabbitMQ,ZeroMQ,... don't apply.

like image 716
WhatIsName Avatar asked Apr 09 '15 19:04

WhatIsName


1 Answers

Maybe your setup is too complex for this but I like to use group combined with a noop task to accomplish something similar. I do it this way because I want to highlight areas which are still synchronous in my pipeline (usually so they can be removed).

Using something similar to your example, I start with a set of tasks which look like this:

tasks.py:

from celery import Celery

app = Celery('tasks', backend="redis", broker='redis://localhost')


@app.task
def add(x, y):
        return x + y


@app.task
def xsum(elements):
    return sum(elements)


@app.task
def noop(ignored):
    return ignored

With these tasks I create a chain using a group to control the results which depend on synchronous results:

In [1]: from tasks import add,xsum,noop
In [2]: from celery import group

# First I run the task which I need the value of later, then I send that result to a group where the first task does nothing and the other tasks are my pipeline.
In [3]: ~(add.si(2, 2) | group(noop.s(),  add.s(4) | add.s(8)))
Out[3]: [4, 16]

# At this point I have a list where the first element is the result of my original task and the second element has the result of my workflow.
In [4]: ~(add.si(2, 2) | group(noop.s(),  add.s(4) | add.s(8)) | xsum.s())
Out[4]: 20

# From here, things can go back to a normal chain
In [5]: ~(add.si(2, 2) | group(noop.s(),  add.s(4) | add.s(8)) | xsum.s() | add.s(1) | add.s(1))
Out[5]: 22

I hope this is useful!

like image 119
erik-e Avatar answered Nov 15 '22 15:11

erik-e