Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make a celery task fail from within the task?

Under some conditions, I want to make a celery task fail from within that task. I tried the following:

from celery.task import task from celery import states  @task() def run_simulation():     if some_condition:         run_simulation.update_state(state=states.FAILURE)         return False 

However, the task still reports to have succeeded:

Task sim.tasks.run_simulation[9235e3a7-c6d2-4219-bbc7-acf65c816e65] succeeded in 1.17847704887s: False

It seems that the state can only be modified while the task is running and once it is completed - celery changes the state to whatever it deems is the outcome (refer to this question). Is there any way, without failing the task by raising an exception, to make celery return that the task has failed?

like image 535
Meilo Avatar asked Oct 06 '11 09:10

Meilo


People also ask

How do you stop a celery task?

To cancel an already executing task with Celery and Python, we can use the revoke function. to call revoke with the task_id of the task to stop. And we set terminate to True to terminate the task.

What happens when a celery task fails?

Celery will stop retrying after 7 failed attempts and raise an exception.

Does Celery run tasks in parallel?

Demonstration of a task which runs a startup task, then parallelizes multiple worker tasks, and then fires-off a reducer task. If passing results around would be important, then could use a chord instead for task2 and task3 .


2 Answers

To mark a task as failed without raising an exception, update the task state to FAILURE and then raise an Ignore exception, because returning any value will record the task as successful, an example:

from celery import Celery, states from celery.exceptions import Ignore  app = Celery('tasks', broker='amqp://guest@localhost//')  @app.task(bind=True) def run_simulation(self):     if some_condition:         # manually update the task state         self.update_state(             state = states.FAILURE,             meta = 'REASON FOR FAILURE'         )          # ignore the task so no other state is recorded         raise Ignore() 

But the best way is to raise an exception from your task, you can create a custom exception to track these failures:

class TaskFailure(Exception):    pass 

And raise this exception from your task:

if some_condition:     raise TaskFailure('Failure reason') 
like image 147
Pierre Avatar answered Oct 09 '22 23:10

Pierre


I'd like to further expand on Pierre's answer as I've encountered some issues using the suggested solution.

To allow custom fields when updating a task's state to states.FAILURE, it is important to also mock some attributes that a FAILURE state would have (notice exc_type and exc_message) While the solution will terminate the task, any attempt to query the state (For example - to fetch the 'REASON FOR FAILURE' value) will fail.

Below is a snippet for reference I took from: https://www.distributedpython.com/2018/09/28/celery-task-states/

@app.task(bind=True) def task(self):     try:         raise ValueError('Some error')     except Exception as ex:         self.update_state(             state=states.FAILURE,             meta={                 'exc_type': type(ex).__name__,                 'exc_message': traceback.format_exc().split('\n')                 'custom': '...'             })         raise Ignore() 
like image 25
Idan Mann Avatar answered Oct 09 '22 23:10

Idan Mann