Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Get progress from async python celery chain by chain id

Tags:

python

celery

I'm trying to get the progress of a task chain by querying each task status. But when retrieving the chain by it's id, I get some object that behaves differently.

In tasks.py

from celery import Celery

celery = Celery('tasks')
celery.config_from_object('celeryconfig')

def unpack_chain(nodes): 
    while nodes.parent:
        yield nodes.parent
        nodes = nodes.parent
    yield nodes

@celery.task
def add(num, num2):
    return num + num2

When quering from ipython...

In [43]: from celery import chain
In [44]: from tasks import celery, add, unpack_chain
In [45]: c = chain(add.s(3,3), add.s(10).set(countdown=100))
In [46]: m = c.apply_async()
In [47]: a = celery.AsyncResult(m.id)
In [48]: a == m
Out[48]: True
In [49]: a.id == m.id
Out[49]: True
In [50]: [t.status for t in list(unpack_chain(a))]
Out[50]: ['PENDING']
In [51]: [t.status for t in list(unpack_chain(m))]
Out[51]: ['PENDING', 'SUCCESS']

Using Python 2.7.3 and Celery 3.0.19 under Redis.

As you can see in 50 & 51, the value returned by celery.AsyncResult differs from the original chain.

How can I get the original chain tasks list by the chain id?

like image 250
Hernantz Avatar asked Apr 30 '13 18:04

Hernantz


1 Answers

Like @Hernantz said, you can't recover the parent chain from just the task ID, you'd have to iterate over your queue which may or may not be possible depending on what you use as a broker.

But if you have the last task id to do the lookup then you have the chain, you just need to store all the task ids and rebuild the chain when you need to examine their status. You can use the following functions:

def store(node):
    id_chain = []
    while node.parent:
      id_chain.append(node.id)
      node = node.parent
    id_chain.append(node.id)
    return id_chain

def restore(id_chain):
    id_chain.reverse()
    last_result = None
    for tid in id_chain:
        result = celery.AsyncResult(tid)
        result.parent = last_result
        last_result = result
    return last_result

Call store when you first get a AsyncResult from chain. Calling restore on that will give you a linked list of AsyncResults like chain gives you.

like image 51
Ryan Jenkins Avatar answered Nov 05 '22 23:11

Ryan Jenkins