Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow DAG : Customized Email on any of the Task failure

Tags:

airflow

Is there any option Customize email and send on any task failure in the DAG. There is an option like 'email_on_failure': True but this doesn't provide an option to Dynamically add content to email Subject or Body.

My DAG will look like below

import airflow

from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
import json
from datetime import timedelta
from datetime import datetime
from airflow.models import Variable

args = {
    'owner': 'airflow',
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0),
    'max_active_runs':10
}

dag = DAG(dag_id='TEST_DAG', default_args=args, schedule_interval='@once')

new_cluster = {
    'spark_version': '4.0.x-scala2.11',
    'node_type_id': 'Standard_D16s_v3',
    'num_workers': 3,
    'spark_conf':{
        'spark.hadoop.javax.jdo.option.ConnectionDriverName':'org.postgresql.Driver',
        .....
    },
    'custom_tags':{
        'ApplicationName':'TEST',
        .....
    }
}

t1 = DatabricksSubmitRunOperator(
  task_id='t1',
  dag=dag,
  new_cluster=new_cluster,
  ......
)

t2 = SimpleHttpOperator(
    task_id='t2',
    method='POST',
    ........    
)

t2.set_upstream(t1)

t3 = SimpleHttpOperator(
    task_id='t3',
    method='POST',
   .....
 )

t3.set_upstream(t2)

send_mail = EmailOperator (
    dag=dag,
    task_id="send_mail",
    to=["[email protected]"],
    subject=" Success",
    html_content='<h3>Success</h3>')

send_mail.set_upstream(t3)

Success case send_mail task will send customized email to specified email id.

But in case any task failure , I want to customized email and send to specified email id. But this is not happening and on failure case ,email send with default subject and body

Any help would be appreciated

like image 840
Sandy Avatar asked Aug 07 '18 11:08

Sandy


People also ask

What is SLA misses in Airflow?

Airflow SLAs are a type of notification that you can use if your tasks are taking longer than expected to complete. If a task takes longer than a maximum amount of time to complete as defined in the SLA, the SLA will be missed and notifications will be triggered.

How do I get notified when an airflow Dag fails?

Trigger the airflow DAG from the UI. It will send an email in the below format if the DAG fails. As the developer uses airflow to run multiple batch jobs in production, setting up any alert notification for all your jobs is highly recommended. Getting notified via email is the most popular one out of all notifications.

How do I send an email when an airflow job fails?

Airflows come with the email_on_failure property, which you can set to send an email if the job fails; you do not need any external operator to perform the task. For this demo, I am creating an airflow job that will send an email when any tasks fail.

Is there any option customize email and send on any task failure?

Is there any option Customize email and send on any task failure in the DAG. There is an option like 'email_on_failure': True but this doesn't provide an option to Dynamically add content to email Subject or Body.

How to send email alerts about task status using Apache Airflow?

In this blog, we will learn how to Send Email Alerts to the user about the Task Status using Apache Airflow. Visit this link and log in with your email Id and password. When you have successfully logged in, you will see the below window: You have to choose “ Mail ” as an app and select whichever device you are using from the Select device dropdown.


1 Answers

I'm using on_failure_callback for this. Please note that it will get triggered for every failed task in a DAG.

def report_failure(context):
    # include this check if you only want to get one email per DAG
    if(task_instance.xcom_pull(task_ids=None, dag_id=dag_id, key=dag_id) == True):
        logging.info("Other failing task has been notified.")
    send_email = EmailOperator(...)
    send_email.execute(context)

'''

dag = DAG(
    ...,
    default_args={
        ...,
        "on_failure_callback": report_failure
    }
)
like image 143
Justinas Marozas Avatar answered Sep 18 '22 04:09

Justinas Marozas