Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow sla_miss_callback function not triggering

I have been trying to get a slack message callback to trigger on SLA misses. I've noticed that:

  1. SLA misses get registered successfully in the Airflow web UI at slamiss/list/

  2. on_failure_callback works successfully

However, the sla_miss_callback function itself will never get triggered.

What I've tried:

  • Different combinations adding sla and sla_miss_callback at the default_args level, the DAG level, and the task level

  • Checking logs on our scheduler and workers for SLA related messages (see also here), but we haven't seen anything

  • The slack message callback function works if called from any other
    basic task or function

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
    'on_failure_callback': send_task_failed_msg_to_slack,
    'sla': timedelta(minutes=1),
    "retries": 0, 
    "pool": 'canary',
    'priority_weight': 1
}

dag = airflow.DAG(
    dag_id='sla_test',
    default_args=default_args,
    sla_miss_callback=send_sla_miss_message_to_slack,
    schedule_interval='*/5 * * * *',
    catchup=False,
    max_active_runs=1,
    dagrun_timeout=timedelta(minutes=5)
)

def sleep():
    """ Sleep for 2 minutes """
    time.sleep(90)
    LOGGER.info("Slept for 2 minutes")

def simple_print(**context):
    """ Prints a message """
    print("Hello World!")


sleep = PythonOperator(
    task_id="sleep",
    python_callable=sleep,
    dag=dag
)

simple_task = PythonOperator(
    task_id="simple_task",
    python_callable=simple_print,
    provide_context=True,
    dag=dag
)

sleep >> simple_task
like image 991
Allen Lee Avatar asked Jan 29 '20 01:01

Allen Lee


2 Answers

I was in similar situation once.
On investigating the scheduler log, I found the following error:

[2020-07-08 09:14:32,781] {scheduler_job.py:534} INFO -  --------------> ABOUT TO CALL SLA MISS CALL BACK  
[2020-07-08 09:14:32,781] {scheduler_job.py:541} ERROR - Could not call sla_miss_callback for DAG 
sla_miss_alert() takes 1 positional arguments but 5 were given

The problem is that your sla_miss_callback function is expecting only 1 argument, but actually this should be like:

def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
    """Function that alerts me that dag_id missed sla"""
    # <function code here>

For reference, checkout the Airflow source code.

Note: Don't put sla_miss_callback=sla_miss_alert in default_args. It should be defined in the DAG definition itself.

like image 91
Ankit Anand Avatar answered Sep 29 '22 22:09

Ankit Anand


Example of using SLA missed and Execution Timeout alerts:

  • At first, you'll get SLA missed after 2 minutes task run,
  • and then, after 4 minutes task will fail with Execution Timeout alert.
"sla": timedelta(minutes=2),  # Default Task SLA time
"execution_timeout": timedelta(minutes=4),  # Default Task Execution Timeout

Also, you have log_url right in the message, so you can easily open task log in Airflow.

Example Slack Message

import time
from datetime import datetime, timedelta
from textwrap import dedent
from typing import Any, Dict, List, Optional, Tuple

from airflow import AirflowException
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.exceptions import AirflowTaskTimeout
from airflow.hooks.base_hook import BaseHook
from airflow.models import DAG, TaskInstance
from airflow.operators.python_operator import PythonOperator

SLACK_STATUS_TASK_FAILED = ":red_circle: Task Failed"
SLACK_STATUS_EXECUTION_TIMEOUT = ":alert: Task Failed by Execution Timeout."


def send_slack_alert_sla_miss(
        dag: DAG,
        task_list: str,
        blocking_task_list: str,
        slas: List[Tuple],
        blocking_tis: List[TaskInstance],
) -> None:
    """Send `SLA missed` alert to Slack"""
    task_instance: TaskInstance = blocking_tis[0]
    message = dedent(
        f"""
        :warning: Task SLA missed.
        *DAG*: {dag.dag_id}
        *Task*: {task_instance.task_id}
        *Execution Time*: {task_instance.execution_date.strftime("%Y-%m-%d %H:%M:%S")} UTC
        *SLA Time*: {task_instance.task.sla}
        _* Time by which the job is expected to succeed_
        *Task State*: `{task_instance.state}`
        *Blocking Task List*: {blocking_task_list}
        *Log URL*: {task_instance.log_url}
        """
    )
    send_slack_alert(message=message)


def send_slack_alert_task_failed(context: Dict[str, Any]) -> None:
    """Send `Task Failed` notification to Slack"""
    task_instance: TaskInstance = context.get("task_instance")
    exception: AirflowException = context.get("exception")

    status = SLACK_STATUS_TASK_FAILED
    if isinstance(exception, AirflowTaskTimeout):
        status = SLACK_STATUS_EXECUTION_TIMEOUT

    # Prepare formatted Slack message
    message = dedent(
        f"""
        {status}
        *DAG*: {task_instance.dag_id}
        *Task*: {task_instance.task_id}
        *Execution Time*: {context.get("execution_date").to_datetime_string()} UTC
        *SLA Time*: {task_instance.task.sla}
        _* Time by which the job is expected to succeed_
        *Execution Timeout*: {task_instance.task.execution_timeout}
        _** Max time allowed for the execution of this task instance_
        *Task Duration*: {timedelta(seconds=round(task_instance.duration))}
        *Task State*: `{task_instance.state}`
        *Exception*: {exception}
        *Log URL*: {task_instance.log_url}
        """
    )
    send_slack_alert(
        message=message,
        context=context,
    )


def send_slack_alert(
        message: str,
        context: Optional[Dict[str, Any]] = None,
) -> None:
    """Send prepared message to Slack"""
    slack_webhook_token = BaseHook.get_connection("slack").password
    notification = SlackWebhookOperator(
        task_id="slack_notification",
        http_conn_id="slack",
        webhook_token=slack_webhook_token,
        message=message,
        username="airflow",
    )
    notification.execute(context)


# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    "owner": "airflow",
    "email": ["test@test,com"],
    "email_on_failure": True,
    "depends_on_past": False,
    "retry_delay": timedelta(minutes=5),
    "sla": timedelta(minutes=2),  # Default Task SLA time
    "execution_timeout": timedelta(minutes=4),  # Default Task Execution Timeout
    "on_failure_callback": send_slack_alert_task_failed,
}

with DAG(
        dag_id="test_sla",
        schedule_interval="*/5 * * * *",
        start_date=datetime(2021, 1, 11),
        default_args=default_args,
        sla_miss_callback=send_slack_alert_sla_miss,  # Must be set here, not in default_args!
) as dag:
    delay_python_task = PythonOperator(
        task_id="delay_five_minutes_python_task",
#MIKE MILLIGAN ADDED THIS
        sla=timedelta(minutes=2),
        python_callable=lambda: time.sleep(300),
    )
like image 24
Maksym Skorupskyi Avatar answered Sep 29 '22 23:09

Maksym Skorupskyi