I have a large csv file that I split into a list of chunks of 100000 rows each, pass each chunk to a function to do complex calculations, and append the result in a global_list. when the last chunk is finished, i take the global_list and do some statistic on. How can i ask celery to process all chunks in parallel but to wait until last task/last chunk is finished before executing the function complex_calc on the global_list?
Thank you for your help
for chunk in global_chunk_list:
def func_calc.delay(chunk) #<<<<< use celery tasks
complex_calc(global_list) #<<<<< should only start when processing last chunk is finished
@celery.task(name='func_calc')
def func_calc(chunk):
...
#save chunk in a global list
global_list.append(result)
def complex_calc(global_list):
...
The appropriate method is to use Group
s and the join
method to wait on a set of parallel tasks to finish executing.
task_group = group([func_calc.s(chunk) for chunk in global_chunk_list])
result_group = task_group.apply_async()
results = result_group.join() # wait for all results
See also the example from the docs. (One difference is using join
rather than get
, which waits for the tasks to finish) See also this answer.
>>> from celery import group
>>> from tasks import add
>>> job = group([
... add.s(2, 2),
... add.s(4, 4),
... add.s(8, 8),
... add.s(16, 16),
... add.s(32, 32),
... ])
>>> result = job.apply_async()
>>> result.ready() # have all subtasks completed?
True
>>> result.successful() # were all subtasks successful?
True
>>> result.get()
[4, 8, 16, 32, 64]
To do this effectively, you'll need to have a result backend configured.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With