At first working with dag callback (on_failure_callback and on_success_callback), I thought it would trigger the success or fail statuses when the dag finishes (as it is defined in dag).
But then it seems to be instanciated at every task instance and not dag run, so if a DAG has N tasks, it will trigger these callbacks N times.
I'm trying to catch the task-id and so send to slack. Reading another related question I came up with the below:
def success_msg(context):
slack.slack_message(context['task_instance']); #send task-id to slack
def failure_msg(context):
slack.slack_message(context['task_instance']); #send task-id to slack
default_args = {
[...]
'on_failure_callback': failure_msg,
'on_success_callback': success_msg,
[...]
}
But it fails, how should I parse the context variables and so be allowed to get the task-id?
You can access the task with the task object from within the context.
context['task'] should be the appropriate way to do this. To get the task name, use task_id:
context['task'].task_id
To find more objects available in the context, you can walk through the list here: https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With