In my DAG file, I have define a on_failure_callback() function to post a Slack in case of failure.
It works well if I specify for each operator in my DAG : on_failure_callback=on_failure_callback()
Is there a way to automate (via default_args for instance, or via my DAG object) the dispatch to all of my operators?
on_failure_callback. Invoked when the task fails. sla_miss_callback. Invoked when a task misses its defined SLA. on_retry_callback.
Since operators create objects that become nodes in the dag, BaseOperator contains many recursive methods for dag crawling behavior. To derive this class, you are expected to override the constructor as well as the 'execute' method.
I finally found a way to do that.
You can pass your on_failure_callback as a default_args
class Foo: @staticmethod def get_default_args(): """ Return default args :return: default_args """ default_args = { 'on_failure_callback': Foo.on_failure_callback } return default_args @staticmethod def on_failure_callback(context): """ Define the callback to post on Slack if a failure is detected in the Workflow :return: operator.execute """ operator = SlackAPIPostOperator( task_id='failure', text=str(context['task_instance']), token=Variable.get("slack_access_token"), channel=Variable.get("slack_channel") ) return operator.execute(context=context)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With