At first working with dag callback
(on_failure_callback
and on_success_callback
), I thought it would trigger the success
or fail
statuses when the dag
finishes (as it is defined in dag).
But then it seems to be instanciated at every task instance
and not dag run
, so if a DAG has N tasks, it will trigger these callbacks N times.
I'm trying to catch the task-id and so send to slack. Reading another related question I came up with the below:
def success_msg(context):
slack.slack_message(context['task_instance']); #send task-id to slack
def failure_msg(context):
slack.slack_message(context['task_instance']); #send task-id to slack
default_args = {
[...]
'on_failure_callback': failure_msg,
'on_success_callback': success_msg,
[...]
}
But it fails, how should I parse the context variables and so be allowed to get the task-id?
You can access the task with the task object from within the context.
context['task']
should be the appropriate way to do this. To get the task name, use task_id
:
context['task'].task_id
To find more objects available in the context, you can walk through the list here: https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With