When I have something like the following
group1 = group(task1.si(), task1.si(), task1.si()) group2 = group(task2.si(), task2.si(), task2.si()) workflow = chain(group1, group2, task3.si())
The intuitive interpretation is that task3 should only execute after all tasks in group 2 have finished.
In reality, task 3 executes while group1 has started but hasn't completed yet.
What am i doing wrong?
Celery workers are worker processes that run tasks independently from one another and outside the context of your main service. Celery beat is a scheduler that orchestrates when to run tasks. You can use it to schedule periodic tasks as well.
revoke cancels the task execution. If a task is revoked, the workers ignore the task and do not execute it. If you don't use persistent revokes your task can be executed after worker's restart. revoke has an terminate option which is False by default.
Celery is a framework for performing asynchronous tasks in your application. Celery is written in Python and makes it very easy to offload work out of the synchronous request lifecycle of a web app onto a pool of task workers to perform jobs asynchronously.
Celery needs a backend to store the state of your task if you want to track it. There are two main operation models for the result backend: RPC (like RabbitMQ/QPid) or a database. Both have its pros and cons and you should check the documentation to get the right one for your application.
So as it turns out, in celery you cannot chain two groups together.
I suspect this is because groups chained with tasks automatically become a chord
--> Celery docs: http://docs.celeryproject.org/en/latest/userguide/canvas.html
Chaining a group together with another task will automatically upgrade it to be a chord:
Groups return a parent task. When chaining two groups together, I suspect that when the first group completes, the chord starts the callback "task". I suspect this "task" is actually the "parent task" of the second group. I further suspect that this parent task completes as soon as it finishes kicking off all the subtasks within the group and as a result the next item after the 2nd group is executed.
To demonstrate this here is some sample code. You'll need to already have a running celery instance.
# celery_experiment.py from celery import task, group, chain, chord from celery.signals import task_sent, task_postrun, task_prerun import time import logging import random random.seed() logging.basicConfig(level=logging.DEBUG) ### HANDLERS ### @task_prerun.connect() def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds): try: logging.info('[%s] starting' % kwargs['id']) except KeyError: pass @task_postrun.connect() def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds): try: logging.info('[%s] finished' % kwargs['id']) except KeyError: pass def random_sleep(id): slp = random.randint(1, 3) logging.info('[%s] sleep for %ssecs' % (id, slp)) time.sleep(slp) @task() def thing(id): logging.info('[%s] begin' % id) random_sleep(id) logging.info('[%s] end' % id) def exec_exp(): st = thing.si(id='st') st_arr = [thing.si(id='st_arr1_a'), thing.si(id='st_arr1_b'), thing.si(id='st_arr1_c'),] st_arr2 = [thing.si(id='st_arr2_a'), thing.si(id='st_arr2_b'),] st2 = thing.si(id='st2') st3 = thing.si(id='st3') st4 = thing.si(id='st4') grp1 = group(st_arr) grp2 = group(st_arr2) # chn can chain two groups together because they are seperated by a single subtask chn = (st | grp1 | st2 | grp2 | st3 | st4) # in chn2 you can't chain two groups together. what will happen is st3 will start before grp2 finishes #chn2 = (st | st2 | grp1 | grp2 | st3 | st4) r = chn() #r2 = chn2()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With