Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Celery how to update the state of a main task until all his subtasks are done?

In Celery I'm running a main task that run one subtask for each item it get from the query. Subtasks should run in parallel. On the UI I have a progress bar that show how many subtasks are done on the total. I'm updating the main task state to give the info to the progress bar. My problem is that the main task ended right after pushing all the subtasks to the broker so I can't update his state anymore. I wish that the main task could wait until all the subtasks were done. Is it possible? Any other solutions? Here's my pseudo code (real code don't use global ;-)).

total = 0
done = 0

@task(ignore_result=True)
def copy_media(path):
    global total, done
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    documents = Document.objects.all()
    total = documents.count()
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    for document in documents:
        process_doc.delay(document, path, copy_media)

@task(ignore_result=True)
def process_doc(document, path, copy_media):
    global total, done
    # Do some stuff
    done += 1
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
like image 653
Etienne Avatar asked Nov 05 '22 02:11

Etienne


1 Answers

I found a way using TaskSet. But I'm not totally satisfied because I can't ignore the result of the subtasks. If I ignore result for the process_doc task results.ready() always return False, results.completed_count() always return 0, etc. Here's the code:

@task(ignore_result=True)
def copy_media(path):
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    documents = Document.objects.all()
    total = documents.count()
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    job = TaskSet(tasks=[process_doc.subtask((document, path))
                         for document in documents])
    results = job.apply_async()
    doc_name = ''
    while not results.ready():
        done = results.completed_count()
        if done:
            last = done - 1
            for idx in xrange(last, -1, -1):
                if results[idx].ready():
                    doc_name = results[idx].result
                    break
        copy_media.update_state(state=STARTED, meta={'total': total, 'done': done, 'doc-name': doc_name})
        time.sleep(0.25)

@task()
def process_doc(document, path):
    # Do some stuff
    return document
like image 58
Etienne Avatar answered Nov 09 '22 12:11

Etienne