Is there any option Customize email and send on any task failure in the DAG. There is an option like 'email_on_failure': True but this doesn't provide an option to Dynamically add content to email Subject or Body.
My DAG will look like below
import airflow
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
import json
from datetime import timedelta
from datetime import datetime
from airflow.models import Variable
args = {
'owner': 'airflow',
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'max_active_runs':10
}
dag = DAG(dag_id='TEST_DAG', default_args=args, schedule_interval='@once')
new_cluster = {
'spark_version': '4.0.x-scala2.11',
'node_type_id': 'Standard_D16s_v3',
'num_workers': 3,
'spark_conf':{
'spark.hadoop.javax.jdo.option.ConnectionDriverName':'org.postgresql.Driver',
.....
},
'custom_tags':{
'ApplicationName':'TEST',
.....
}
}
t1 = DatabricksSubmitRunOperator(
task_id='t1',
dag=dag,
new_cluster=new_cluster,
......
)
t2 = SimpleHttpOperator(
task_id='t2',
method='POST',
........
)
t2.set_upstream(t1)
t3 = SimpleHttpOperator(
task_id='t3',
method='POST',
.....
)
t3.set_upstream(t2)
send_mail = EmailOperator (
dag=dag,
task_id="send_mail",
to=["[email protected]"],
subject=" Success",
html_content='<h3>Success</h3>')
send_mail.set_upstream(t3)
Success case send_mail task will send customized email to specified email id.
But in case any task failure , I want to customized email and send to specified email id. But this is not happening and on failure case ,email send with default subject and body
Any help would be appreciated
Airflow SLAs are a type of notification that you can use if your tasks are taking longer than expected to complete. If a task takes longer than a maximum amount of time to complete as defined in the SLA, the SLA will be missed and notifications will be triggered.
Trigger the airflow DAG from the UI. It will send an email in the below format if the DAG fails. As the developer uses airflow to run multiple batch jobs in production, setting up any alert notification for all your jobs is highly recommended. Getting notified via email is the most popular one out of all notifications.
Airflows come with the email_on_failure property, which you can set to send an email if the job fails; you do not need any external operator to perform the task. For this demo, I am creating an airflow job that will send an email when any tasks fail.
Is there any option Customize email and send on any task failure in the DAG. There is an option like 'email_on_failure': True but this doesn't provide an option to Dynamically add content to email Subject or Body.
In this blog, we will learn how to Send Email Alerts to the user about the Task Status using Apache Airflow. Visit this link and log in with your email Id and password. When you have successfully logged in, you will see the below window: You have to choose “ Mail ” as an app and select whichever device you are using from the Select device dropdown.
I'm using on_failure_callback
for this. Please note that it will get triggered for every failed task in a DAG.
def report_failure(context):
# include this check if you only want to get one email per DAG
if(task_instance.xcom_pull(task_ids=None, dag_id=dag_id, key=dag_id) == True):
logging.info("Other failing task has been notified.")
send_email = EmailOperator(...)
send_email.execute(context)
'''
dag = DAG(
...,
default_args={
...,
"on_failure_callback": report_failure
}
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With