I have a check_orders task that's executed periodically. It makes a group of tasks so that I can time how long executing the tasks took, and perform something when they're all done (this is the purpose of res.join [1] and grouped_subs) The tasks that are grouped are pairs of chained tasks.
What I want is for when the first task doesn't meet a condition (fails) don't execute the second task in the chain. I can't figure this out for the life of me and I feel this is pretty basic functionality for a job queue manager. When I try the things I have commented out after [2] (raising exceptions, removing callbacks)... we get stuck on the join() in check_orders for some reason (it breaks the group). I've tried setting ignore_result to False as well for all these tasks but it still doesn't work.
@task(ignore_result=True) def check_orders(): # check all the orders and send out appropriate notifications grouped_subs = [] for thingy in things: ... grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )), notify.subtask((args_sub_2, ), immutable=True))) res = group(grouped_subs).apply_async() res.join() #[1] logger.info('Done checking orders at %s' % current_task.request.id)) @task(ignore_result=True) def is_room_open(args_sub_1): #something time consuming if http_req_and_parse(args_sub_1): # go on and do the notify task return True else: # [2] # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how? # None of the following things work: # is_room_open.update_state(state='FAILURE') # raise celery.exceptions.Ignore() # raise Exception('spam', 'eggs') # current_task.request.callbacks[:] = [] @task(ignore_result=True) def notify(args_sub_2): # something else time consuming, only do this if the first part of the chain # passed a test (the chained tasks before this were 'successful' notify_user(args_sub_2)
If a task is revoked, the workers ignore the task and do not execute it. If you don't use persistent revokes your task can be executed after worker's restart. revoke has an terminate option which is False by default. If you need to kill the executing task you need to set terminate to True.
Celery is an asynchronous task queue framework written in Python. Celery makes it easy to execute background tasks but also provides tools for parallel execution and task coordination.
celery beat is a scheduler; It kicks off tasks at regular intervals, that are then executed by available worker nodes in the cluster. By default the entries are taken from the beat_schedule setting, but custom stores can also be used, like storing the entries in a SQL database.
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.
In my opinion this is a common use-case that doesn't get enough love in the documentation.
Assuming you want to abort a chain mid-way while still reporting SUCCESS as status of the completed tasks, and not sending any error log or whatnot (else you can just raise an exception) then a way to accomplish this is:
@app.task(bind=True) # Note that we need bind=True for self to work def task1(self, other_args): #do_stuff if end_chain: self.request.callbacks = None return #Other stuff to do if end_chain is False
So in your example:
@app.task(ignore_result=True, bind=True) def is_room_open(self, args_sub_1): #something time consuming if http_req_and_parse(args_sub_1): # go on and do the notify task return True else: self.request.callbacks = None
Will work. Note that instead of ignore_result=True
and subtask()
you can use the shortcut .si()
as stated by @abbasov-alexander
Edited to work with EAGER mode, as suggested by @PhilipGarnero in the comments.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With