Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery stop execution of a chain

Tags:

I have a check_orders task that's executed periodically. It makes a group of tasks so that I can time how long executing the tasks took, and perform something when they're all done (this is the purpose of res.join [1] and grouped_subs) The tasks that are grouped are pairs of chained tasks.

What I want is for when the first task doesn't meet a condition (fails) don't execute the second task in the chain. I can't figure this out for the life of me and I feel this is pretty basic functionality for a job queue manager. When I try the things I have commented out after [2] (raising exceptions, removing callbacks)... we get stuck on the join() in check_orders for some reason (it breaks the group). I've tried setting ignore_result to False as well for all these tasks but it still doesn't work.

@task(ignore_result=True) def check_orders():     # check all the orders and send out appropriate notifications     grouped_subs = []      for thingy in things:        ...          grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )),                          notify.subtask((args_sub_2, ), immutable=True)))      res = group(grouped_subs).apply_async()      res.join()         #[1]     logger.info('Done checking orders at %s' % current_task.request.id))  @task(ignore_result=True) def is_room_open(args_sub_1):     #something time consuming     if http_req_and_parse(args_sub_1):         # go on and do the notify task         return True     else:         # [2]         # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?         # None of the following things work:         # is_room_open.update_state(state='FAILURE')         # raise celery.exceptions.Ignore()         # raise Exception('spam', 'eggs')         # current_task.request.callbacks[:] = []  @task(ignore_result=True) def notify(args_sub_2):     # something else time consuming, only do this if the first part of the chain      # passed a test (the chained tasks before this were 'successful'     notify_user(args_sub_2) 
like image 633
Salami Avatar asked Jul 04 '13 03:07

Salami


People also ask

How do you stop the execution of a Celery task?

If a task is revoked, the workers ignore the task and do not execute it. If you don't use persistent revokes your task can be executed after worker's restart. revoke has an terminate option which is False by default. If you need to kill the executing task you need to set terminate to True.

Does Celery run tasks in parallel?

Celery is an asynchronous task queue framework written in Python. Celery makes it easy to execute background tasks but also provides tools for parallel execution and task coordination.

What is Celery beat Python?

celery beat is a scheduler; It kicks off tasks at regular intervals, that are then executed by available worker nodes in the cluster. By default the entries are taken from the beat_schedule setting, but custom stores can also be used, like storing the entries in a SQL database.

How does Celery backend work?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.


1 Answers

In my opinion this is a common use-case that doesn't get enough love in the documentation.

Assuming you want to abort a chain mid-way while still reporting SUCCESS as status of the completed tasks, and not sending any error log or whatnot (else you can just raise an exception) then a way to accomplish this is:

@app.task(bind=True)  # Note that we need bind=True for self to work def task1(self, other_args):     #do_stuff     if end_chain:         self.request.callbacks = None         return     #Other stuff to do if end_chain is False 

So in your example:

@app.task(ignore_result=True, bind=True) def is_room_open(self, args_sub_1):     #something time consuming     if http_req_and_parse(args_sub_1):         # go on and do the notify task         return True     else:         self.request.callbacks = None 

Will work. Note that instead of ignore_result=True and subtask() you can use the shortcut .si() as stated by @abbasov-alexander

Edited to work with EAGER mode, as suggested by @PhilipGarnero in the comments.

like image 187
AntonioMO Avatar answered Sep 19 '22 14:09

AntonioMO