I have a DAG
and then whenever it success or fails, I want it to trigger a method which posts to Slack.
My DAG args
is like below:
default_args = {
[...]
'on_failure_callback': slack.slack_message(sad_message),
'on_success_callback': slack.slack_message(happy_message),
[...]
}
And the DAG
definition itself:
dag = DAG(
dag_id = dag_name_id,
default_args=default_args,
description='load data from mysql to S3',
schedule_interval='*/10 * * * *',
catchup=False
)
But when I check Slack there is more than 100 message each minute, as if is evaluating at each scheduler heartbeat and for every log it did runned the success and failure method as if it worked and didn't work for the same task instance (not fine).
How should I properly use the on_failure_callback
and on_success_callback
to handle dags statuses and call a custom method?
The reason it's creating the messages is because when you are defining your default_args
, you are executing the functions. You need to just pass the function definition without executing it.
Since the function has an argument, it'll get a little trickier. You can either define two partial functions or define two wrapper functions.
So you can either do:
from functools import partial
success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);
default_args = {
[...]
'on_failure_callback': failure_msg
'on_success_callback': success_msg
[...]
}
or
def success_msg():
slack.slack_message(happy_message);
def failure_msg():
slack.slack_message(sad_message);
default_args = {
[...]
'on_failure_callback': failure_msg
'on_success_callback': success_msg
[...]
}
In either method, note how just the function definition failure_msg
and success_msg
are being passed, not the result they give when executed.
default_args expands at task level, therefore it becomes per task callback
apply the attribute at DAG flag level outside of "default_args"
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With