I have been trying to get a slack message callback to trigger on SLA misses. I've noticed that:
SLA misses get registered successfully in the Airflow web UI at
slamiss/list/
on_failure_callback
works successfully
However, the sla_miss_callback
function itself will never get triggered.
What I've tried:
Different combinations adding sla
and sla_miss_callback
at the
default_args
level, the DAG level, and the task level
Checking logs on our scheduler and workers for SLA related messages (see also here), but we haven't seen anything
The slack message callback function works if called from any other
basic task or function
default_args = {
"owner": "airflow",
"depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
'on_failure_callback': send_task_failed_msg_to_slack,
'sla': timedelta(minutes=1),
"retries": 0,
"pool": 'canary',
'priority_weight': 1
}
dag = airflow.DAG(
dag_id='sla_test',
default_args=default_args,
sla_miss_callback=send_sla_miss_message_to_slack,
schedule_interval='*/5 * * * *',
catchup=False,
max_active_runs=1,
dagrun_timeout=timedelta(minutes=5)
)
def sleep():
""" Sleep for 2 minutes """
time.sleep(90)
LOGGER.info("Slept for 2 minutes")
def simple_print(**context):
""" Prints a message """
print("Hello World!")
sleep = PythonOperator(
task_id="sleep",
python_callable=sleep,
dag=dag
)
simple_task = PythonOperator(
task_id="simple_task",
python_callable=simple_print,
provide_context=True,
dag=dag
)
sleep >> simple_task
I was in similar situation once.
On investigating the scheduler log, I found the following error:
[2020-07-08 09:14:32,781] {scheduler_job.py:534} INFO - --------------> ABOUT TO CALL SLA MISS CALL BACK
[2020-07-08 09:14:32,781] {scheduler_job.py:541} ERROR - Could not call sla_miss_callback for DAG
sla_miss_alert() takes 1 positional arguments but 5 were given
The problem is that your sla_miss_callback
function is expecting only 1 argument, but actually this should be like:
def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Function that alerts me that dag_id missed sla"""
# <function code here>
For reference, checkout the Airflow source code.
Note: Don't put sla_miss_callback=sla_miss_alert
in default_args
. It should be defined in the DAG definition itself.
Example of using SLA missed
and Execution Timeout
alerts:
SLA missed
after 2 minutes task run,Execution Timeout
alert."sla": timedelta(minutes=2), # Default Task SLA time
"execution_timeout": timedelta(minutes=4), # Default Task Execution Timeout
Also, you have log_url
right in the message, so you can easily open task log in Airflow.
Example Slack Message
import time
from datetime import datetime, timedelta
from textwrap import dedent
from typing import Any, Dict, List, Optional, Tuple
from airflow import AirflowException
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.exceptions import AirflowTaskTimeout
from airflow.hooks.base_hook import BaseHook
from airflow.models import DAG, TaskInstance
from airflow.operators.python_operator import PythonOperator
SLACK_STATUS_TASK_FAILED = ":red_circle: Task Failed"
SLACK_STATUS_EXECUTION_TIMEOUT = ":alert: Task Failed by Execution Timeout."
def send_slack_alert_sla_miss(
dag: DAG,
task_list: str,
blocking_task_list: str,
slas: List[Tuple],
blocking_tis: List[TaskInstance],
) -> None:
"""Send `SLA missed` alert to Slack"""
task_instance: TaskInstance = blocking_tis[0]
message = dedent(
f"""
:warning: Task SLA missed.
*DAG*: {dag.dag_id}
*Task*: {task_instance.task_id}
*Execution Time*: {task_instance.execution_date.strftime("%Y-%m-%d %H:%M:%S")} UTC
*SLA Time*: {task_instance.task.sla}
_* Time by which the job is expected to succeed_
*Task State*: `{task_instance.state}`
*Blocking Task List*: {blocking_task_list}
*Log URL*: {task_instance.log_url}
"""
)
send_slack_alert(message=message)
def send_slack_alert_task_failed(context: Dict[str, Any]) -> None:
"""Send `Task Failed` notification to Slack"""
task_instance: TaskInstance = context.get("task_instance")
exception: AirflowException = context.get("exception")
status = SLACK_STATUS_TASK_FAILED
if isinstance(exception, AirflowTaskTimeout):
status = SLACK_STATUS_EXECUTION_TIMEOUT
# Prepare formatted Slack message
message = dedent(
f"""
{status}
*DAG*: {task_instance.dag_id}
*Task*: {task_instance.task_id}
*Execution Time*: {context.get("execution_date").to_datetime_string()} UTC
*SLA Time*: {task_instance.task.sla}
_* Time by which the job is expected to succeed_
*Execution Timeout*: {task_instance.task.execution_timeout}
_** Max time allowed for the execution of this task instance_
*Task Duration*: {timedelta(seconds=round(task_instance.duration))}
*Task State*: `{task_instance.state}`
*Exception*: {exception}
*Log URL*: {task_instance.log_url}
"""
)
send_slack_alert(
message=message,
context=context,
)
def send_slack_alert(
message: str,
context: Optional[Dict[str, Any]] = None,
) -> None:
"""Send prepared message to Slack"""
slack_webhook_token = BaseHook.get_connection("slack").password
notification = SlackWebhookOperator(
task_id="slack_notification",
http_conn_id="slack",
webhook_token=slack_webhook_token,
message=message,
username="airflow",
)
notification.execute(context)
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
"owner": "airflow",
"email": ["test@test,com"],
"email_on_failure": True,
"depends_on_past": False,
"retry_delay": timedelta(minutes=5),
"sla": timedelta(minutes=2), # Default Task SLA time
"execution_timeout": timedelta(minutes=4), # Default Task Execution Timeout
"on_failure_callback": send_slack_alert_task_failed,
}
with DAG(
dag_id="test_sla",
schedule_interval="*/5 * * * *",
start_date=datetime(2021, 1, 11),
default_args=default_args,
sla_miss_callback=send_slack_alert_sla_miss, # Must be set here, not in default_args!
) as dag:
delay_python_task = PythonOperator(
task_id="delay_five_minutes_python_task",
#MIKE MILLIGAN ADDED THIS
sla=timedelta(minutes=2),
python_callable=lambda: time.sleep(300),
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With