Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow UI link in SlackAPIPostOperator?

Tags:

slack

airflow

Im using SlackAPIPostOperator in Airflow to send Slack messages on task failures. I wondered if there's a smart way to add a link to the airflow UI logs page of the failed task to the slack message.

The following is an example I want to achieve:

http://myserver-uw1.myaws.com:8080/admin/airflow/graph?execution_date=...&arrange=LR&root=&dag_id=MyDAG&_csrf_token=mytoken

The current message is:

def slack_failed_task(context):
    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel="#mychannel",
        token="...",
        text=':red_circle: Failure on: ' + 
             str(context['dag']) +
             '\nRun ID: ' + str(context['run_id']) +
             '\nTask: ' + str(context['task_instance']))
    return failed_alert.execute(context=context)
like image 201
Aviv Oron Avatar asked Jan 08 '18 15:01

Aviv Oron


People also ask

How do I get to the airflow UI?

To access your Apache Airflow UIOpen the Environments page on the Amazon MWAA console. Choose an environment. Choose Open Airflow UI.

Does Apache airflow have a UI?

A notable feature of Apache Airflow is the user interface (UI), which provides insights into your DAGs and DAG runs. The UI is a useful tool for understanding, monitoring, and troubleshooting your pipelines. This guide is an overview of some of the most useful features and visualizations in the Airflow UI.

How do I create a slack connection in airflow?

Go to api.slack.com/apps and click on “Create New App” for your airflow instance to access. Upon exiting the modal, enable incoming webhooks for your slack workspace app. Then, scroll to the bottom and select “Add new webhook to workspace”.


2 Answers

You can build the url to the UI with the config value base_url under the [webserver] section and then use Slack's message format <http://example.com|stuff> for links.

from airflow import configuration

def slack_failed_task(context):
    link = '<{base_url}/admin/airflow/log?dag_id={dag_id}&task_id={task_id}&execution_date={execution_date}|logs>'.format(
        base_url=configuration.get('webserver', 'BASE_URL'),
        dag_id=context['dag'].dag_id,
        task_id=context['task_instance'].task_id,
        execution_date=context['ts']))  # equal to context['execution_date'].isoformat())

    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel="#mychannel",
        token="...",
        text=':red_circle: Failure on: ' + 
             str(context['dag']) +
             '\nRun ID: ' + str(context['run_id']) +
             '\nTask: ' + str(context['task_instance']) +
             '\nSee ' + link + ' to debug')
    return failed_alert.execute(context=context)
like image 140
Daniel Huang Avatar answered Dec 10 '22 07:12

Daniel Huang


We can also do this using the log_url attribute in the Task Instance

def slack_failed_task(context):
    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel="#mychannel",
        token="...",
        text=':red_circle: Failure on: ' + 
             str(context['dag']) +
             '\nRun ID: ' + str(context['run_id']) +
             '\nTask: ' + str(context['task_instance']) + 
             '\nLogs: <{url}|to Airflow UI>'.format(url=context['task_instance'].log_url)
    )
    return failed_alert.execute(context=context)

I know this is available since version 1.10.4 at the very least.

like image 36
Tanjin Avatar answered Dec 10 '22 09:12

Tanjin