import celery
def temptask(n):
header=list(tempsubtask.si(i) for i in range(n))
callback=templink.si('printed at last?')
r = celery.chord(celery.group(header))(callback)
return r
@task()
def tempsubtask(i):
print i
for x in range(i):
time.sleep(2)
current_task.update_state(
state='PROGRESS', meta={'completed': x, 'total': i })
@task()
def templink(x):
print 'this should be run at last %s'%x
#executing temptask
r = temptask(100)
I want acccess to the progress status updated by tempsubtask. How can I go about achieving it?
I've had a similar question. Most examples on the net are outdated, the docs didn't help much, but the docs have links to sources, reading which did help me. My objective was to organize parallel tasks in groups. The groups would have to be executed sequentially in order. So I decided to generate the task ids before starting any tasks separately and only assigning them. I'm using Celery 4.3.0
Here's a brief example.
Firstly I needed a dummy task to make execution sequential and to be able to check the state of a certain group. As this is used a callback, it will complete only after all other tasks in the group.
@celery.task(bind=True, name="app.tasks.dummy_task")
def dummy_task( self, results=None, *args, **kwargs ):
return results
My comments here explain how I assign ids.
from celery.utils import uuid
from celery import group, chord, chain
# Generating task ids,
# which can be saved to a db, sent to the client and so on
#
# This is done before executing any tasks
task_id_1 = uuid()
task_id_2 = uuid()
chord_callback_id_1 = uuid()
chord_callback_id_2 = uuid()
workflow_id = None
# Generating goups, using signatures
# the group may contain any number of tasks
group_1 = group(
[
celery.signature(
'app.tasks.real_task',
args=(),
kwargs = { 'email': some_email, 'data':some_data },
options = ( {'task_id': task_id_1 } )
)
]
)
group_2 = group(
[
celery.signature(
'app.tasks.real_task',
args=(),
kwargs = { 'email': some_email, 'data':some_data },
options = ( {'task_id': task_id_2 } )
)
]
)
# Creating callback task which will simply rely the result
# Using the task id, which has been generated before
#
# The dummy task start after all tasks in this group are completed
# This way we know that the group is completed
chord_callback = celery.signature(
'app.tasks.dummy_task',
options=( {'task_id': chord_callback_id_1 } )
)
chord_callback_2 = celery.signature(
'app.tasks.dummy_task',
options=( {'task_id': chord_callback_id_2 } )
)
# we can monitor each step status
# by its chord callback id
# the id of the chord callback
step1 = chord( group_1, body=chord_callback )
# the id of the chord callback
step2 = chord( group_2, body=chord_callback_2 )
# start the workflow execution
# the steps will execute sequentially
workflow = chain( step1, step2 )()
# the id of the last cord callback
workflow_id = workflow.id
# return any ids you need
print( workflow_id )
That's how I can check the status of any task in my app.
# This is a simplified example
# some code is omitted
from celery.result import AsyncResult
def task_status( task_id=None ):
# PENDING
# RECEIVED
# STARTED
# SUCCESS
# FAILURE
# REVOKED
# RETRY
task = AsyncResult(task_id)
response = {
'state': task.state,
}
return jsonify(response), 200
After hours of googling I stumbled upon http://www.manasupo.com/2012/03/chord-progress-in-celery.html . Though the solution there didn't work for me out of the box, it did inspire me to try something similar.
from celery.utils import uuid
from celery import chord
class ProgressChord(chord):
def __call__(self, body=None, **kwargs):
_chord = self.type
body = (body or self.kwargs['body']).clone()
kwargs = dict(self.kwargs, body=body, **kwargs)
if _chord.app.conf.CELERY_ALWAYS_EAGER:
return self.apply((), kwargs)
callback_id = body.options.setdefault('task_id', uuid())
r= _chord(**kwargs)
return _chord.AsyncResult(callback_id), r
and instead of executing celery.chord I use ProgressChord as follows:
def temptask(n):
header=list(tempsubtask.si(i) for i in range(n))
callback=templink.si('printed at last?')
r = celery.Progresschord(celery.group(header))(callback)
return r
returned value of r contained a tuple having both, callback's asyncresult and a group result. So success looked something like this:
In [3]: r
Out[3]:
(<AsyncResult: bf87507c-14cb-4ac4-8070-d32e4ff326a6>,
<GroupResult: af69e131-5a93-492d-b985-267484651d95 [4672cbbb-8ec3-4a9e-971a-275807124fae, a236e55f-b312-485c-a816-499d39d7de41, e825a072-b23c-43f2-b920-350413fd5c9e, e3f8378d-fd02-4a34-934b-39a5a735871d, c4f7093b-9f1a-4e5e-b90d-66f83b9c97c4, d5c7dc2c-4e10-4e71-ba2b-055a33e15f02, 07b1c6f7-fe95-4c1f-b0ba-6bc82bceaa4e, 00966cb8-41c2-4e95-b5e7-d8604c000927, e039c78e-6647-4c8d-b59b-e9baf73171a0, 6cfdef0a-25a2-4905-a40e-fea9c7940044]>)
I inherited and overrode [celery.chord][1]
instead of [celery.task.chords.Chord][2]
because I couldn't find it's source anywhere.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With