Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

celery - chaining groups and subtasks. -> out of order execution

When I have something like the following

group1 = group(task1.si(), task1.si(), task1.si()) group2 = group(task2.si(), task2.si(), task2.si())  workflow = chain(group1, group2, task3.si()) 

The intuitive interpretation is that task3 should only execute after all tasks in group 2 have finished.

In reality, task 3 executes while group1 has started but hasn't completed yet.

What am i doing wrong?

like image 978
w-- Avatar asked Feb 27 '13 22:02

w--


People also ask

How does celery execute tasks?

Celery workers are worker processes that run tasks independently from one another and outside the context of your main service. Celery beat is a scheduler that orchestrates when to run tasks. You can use it to schedule periodic tasks as well.

How do you stop the execution of a celery task?

revoke cancels the task execution. If a task is revoked, the workers ignore the task and do not execute it. If you don't use persistent revokes your task can be executed after worker's restart. revoke has an terminate option which is False by default.

What is celery asynchronous?

Celery is a framework for performing asynchronous tasks in your application. Celery is written in Python and makes it very easy to offload work out of the synchronous request lifecycle of a web app onto a pool of task workers to perform jobs asynchronously.

How do you track celery tasks?

Celery needs a backend to store the state of your task if you want to track it. There are two main operation models for the result backend: RPC (like RabbitMQ/QPid) or a database. Both have its pros and cons and you should check the documentation to get the right one for your application.


1 Answers

So as it turns out, in celery you cannot chain two groups together.
I suspect this is because groups chained with tasks automatically become a chord
--> Celery docs: http://docs.celeryproject.org/en/latest/userguide/canvas.html

Chaining a group together with another task will automatically upgrade it to be a chord:

Groups return a parent task. When chaining two groups together, I suspect that when the first group completes, the chord starts the callback "task". I suspect this "task" is actually the "parent task" of the second group. I further suspect that this parent task completes as soon as it finishes kicking off all the subtasks within the group and as a result the next item after the 2nd group is executed.

To demonstrate this here is some sample code. You'll need to already have a running celery instance.

# celery_experiment.py  from celery import task, group, chain, chord from celery.signals import task_sent, task_postrun, task_prerun  import time import logging  import random random.seed()  logging.basicConfig(level=logging.DEBUG)  ### HANDLERS ###     @task_prerun.connect() def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):         try:         logging.info('[%s] starting' % kwargs['id'])     except KeyError:         pass  @task_postrun.connect() def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):     try:             logging.info('[%s] finished' % kwargs['id'])     except KeyError:         pass   def random_sleep(id):     slp = random.randint(1, 3)     logging.info('[%s] sleep for %ssecs' % (id, slp))     time.sleep(slp)  @task() def thing(id):     logging.info('[%s] begin' % id)     random_sleep(id)     logging.info('[%s] end' % id)   def exec_exp():     st = thing.si(id='st')     st_arr = [thing.si(id='st_arr1_a'), thing.si(id='st_arr1_b'), thing.si(id='st_arr1_c'),]     st_arr2 = [thing.si(id='st_arr2_a'), thing.si(id='st_arr2_b'),]     st2 = thing.si(id='st2')     st3 = thing.si(id='st3')     st4 = thing.si(id='st4')      grp1 = group(st_arr)     grp2 = group(st_arr2)      # chn can chain two groups together because they are seperated by a single subtask     chn = (st | grp1 | st2 | grp2 | st3 | st4)      # in chn2 you can't chain two groups together. what will happen is st3 will start before grp2 finishes     #chn2 = (st | st2 | grp1 | grp2 | st3 |  st4)      r = chn()     #r2 = chn2() 
like image 168
w-- Avatar answered Sep 24 '22 02:09

w--