Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow default on_failure_callback

Tags:

In my DAG file, I have define a on_failure_callback() function to post a Slack in case of failure.

It works well if I specify for each operator in my DAG : on_failure_callback=on_failure_callback()

Is there a way to automate (via default_args for instance, or via my DAG object) the dispatch to all of my operators?

like image 480
Pierre CORBEL Avatar asked Jul 25 '17 19:07

Pierre CORBEL


People also ask

What is On_failure_callback?

on_failure_callback. Invoked when the task fails. sla_miss_callback. Invoked when a task misses its defined SLA. on_retry_callback.

What is base operator in Airflow?

Since operators create objects that become nodes in the dag, BaseOperator contains many recursive methods for dag crawling behavior. To derive this class, you are expected to override the constructor as well as the 'execute' method.


1 Answers

I finally found a way to do that.

You can pass your on_failure_callback as a default_args

class Foo:   @staticmethod   def get_default_args():       """       Return default args       :return: default_args       """        default_args = {           'on_failure_callback': Foo.on_failure_callback       }        return default_args    @staticmethod   def on_failure_callback(context):      """      Define the callback to post on Slack if a failure is detected in the Workflow      :return: operator.execute      """       operator = SlackAPIPostOperator(          task_id='failure',          text=str(context['task_instance']),          token=Variable.get("slack_access_token"),          channel=Variable.get("slack_channel")      )       return operator.execute(context=context)  
like image 146
Pierre CORBEL Avatar answered Sep 20 '22 22:09

Pierre CORBEL