Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Schedule Celery task to run after other task(s) complete

Tags:

celery

I want to accomplish something like this:

results = []
for i in range(N):
    data = generate_data_slowly()
    res = tasks.process_data.apply_async(data)
    results.append(res)
celery.collect(results).then(tasks.combine_processed_data())

ie launch asynchronous tasks over a long period of time, then schedule a dependent task that will only be executed once all earlier tasks are complete.

I've looked at things like chain and chord, but it seems like they only work if you can construct your task graph completely upfront.

like image 229
so12311 Avatar asked Sep 02 '25 17:09

so12311


1 Answers

For anyone interested, I ended up using this snippet:

@app.task(bind=True, max_retries=None)
def wait_for(self, task_id_or_ids):
    try:
        ready = app.AsyncResult(task_id_or_ids).ready()
    except TypeError:
        ready = all(app.AsyncResult(task_id).ready()
                    for task_id in task_id_or_ids)

    if not ready:
        self.retry(countdown=2**self.request.retries)

And writing the workflow something like this:

task_ids = []
for i in range(N):
    task = (generate_data_slowly.si(i) | 
            process_data.si(i)
            )
    task_id = task.delay().task_id
    task_ids.append(task_id)

final_task = (wait_for(task_ids) |
        combine_processed_data.si()
        )

final_task.delay()
like image 109
so12311 Avatar answered Sep 05 '25 14:09

so12311



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!