Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow failed slack message

How can I configure Airflow so that any failure in the DAG will (immediately) result in a slack message?

At this moment I manage it by creating a slack_failed_task:

slack_failed_task =  SlackAPIPostOperator(
    task_id='slack_failed',
    channel="#datalabs",
    trigger_rule='one_failed',
    token="...",
    text = ':red_circle: DAG Failed',
    icon_url = 'http://airbnb.io/img/projects/airflow3.png',
    dag=dag)

And set this task (one_failed) upstream from each other task in the DAG:

slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e

It works, but it's error prone since forgetting to add the task will skip the slack notifications and seems like a lot of work.

Is there perhaps a way to expand on the email_on_failure property in the DAG?

Bonus ;-) for including a way to pass the name of the failed task to the message.

like image 881
Tom Lous Avatar asked Nov 27 '22 08:11

Tom Lous


2 Answers

Maybe this example will be helpful:

def slack_failed_task(contextDictionary, **kwargs):  
       failed_alert = SlackAPIPostOperator(
         task_id='slack_failed',
         channel="#datalabs",
         token="...",
         text = ':red_circle: DAG Failed',
         owner = '_owner',)
         return failed_alert.execute


task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
like image 112
Ravi H Avatar answered Dec 20 '22 19:12

Ravi H


Try the new SlackWebhookOperator which is there in Airflow version>=1.10.0

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

slack_msg="Hi Wssup?"

slack_test =  SlackWebhookOperator(
    task_id='slack_test',
    http_conn_id='slack_connection',
    webhook_token='/1234/abcd',
    message=slack_msg,
    channel='#airflow_updates',
    username='airflow_'+os.environ['ENVIRONMENT'],
    icon_emoji=None,
    link_names=False,
    dag=dag)

Note: Make sure you have slack_connection added in your Airflow connections as

host=https://hooks.slack.com/services/
like image 21
Deep Nirmal Avatar answered Dec 20 '22 19:12

Deep Nirmal