In Celery I'm running a main task that run one subtask for each item it get from the query. Subtasks should run in parallel. On the UI I have a progress bar that show how many subtasks are done on the total. I'm updating the main task state to give the info to the progress bar. My problem is that the main task ended right after pushing all the subtasks to the broker so I can't update his state anymore. I wish that the main task could wait until all the subtasks were done. Is it possible? Any other solutions? Here's my pseudo code (real code don't use global ;-)).
total = 0
done = 0
@task(ignore_result=True)
def copy_media(path):
global total, done
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
documents = Document.objects.all()
total = documents.count()
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
for document in documents:
process_doc.delay(document, path, copy_media)
@task(ignore_result=True)
def process_doc(document, path, copy_media):
global total, done
# Do some stuff
done += 1
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
I found a way using TaskSet
. But I'm not totally satisfied because I can't ignore the result of the subtasks. If I ignore result for the process_doc
task results.ready()
always return False
, results.completed_count()
always return 0, etc. Here's the code:
@task(ignore_result=True)
def copy_media(path):
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
documents = Document.objects.all()
total = documents.count()
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
job = TaskSet(tasks=[process_doc.subtask((document, path))
for document in documents])
results = job.apply_async()
doc_name = ''
while not results.ready():
done = results.completed_count()
if done:
last = done - 1
for idx in xrange(last, -1, -1):
if results[idx].ready():
doc_name = results[idx].result
break
copy_media.update_state(state=STARTED, meta={'total': total, 'done': done, 'doc-name': doc_name})
time.sleep(0.25)
@task()
def process_doc(document, path):
# Do some stuff
return document
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With