How can I configure Airflow so that any failure in the DAG will (immediately) result in a slack message?
At this moment I manage it by creating a slack_failed_task:
slack_failed_task = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
trigger_rule='one_failed',
token="...",
text = ':red_circle: DAG Failed',
icon_url = 'http://airbnb.io/img/projects/airflow3.png',
dag=dag)
And set this task (one_failed) upstream from each other task in the DAG:
slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e
It works, but it's error prone since forgetting to add the task will skip the slack notifications and seems like a lot of work.
Is there perhaps a way to expand on the email_on_failure
property in the DAG?
Bonus ;-) for including a way to pass the name of the failed task to the message.
Maybe this example will be helpful:
def slack_failed_task(contextDictionary, **kwargs):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
token="...",
text = ':red_circle: DAG Failed',
owner = '_owner',)
return failed_alert.execute
task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
Try the new SlackWebhookOperator which is there in Airflow version>=1.10.0
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
slack_msg="Hi Wssup?"
slack_test = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack_connection',
webhook_token='/1234/abcd',
message=slack_msg,
channel='#airflow_updates',
username='airflow_'+os.environ['ENVIRONMENT'],
icon_emoji=None,
link_names=False,
dag=dag)
Note: Make sure you have slack_connection
added in your Airflow connections as
host=https://hooks.slack.com/services/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With