Im using SlackAPIPostOperator
in Airflow to send Slack messages on task failures.
I wondered if there's a smart way to add a link to the airflow UI logs page of the failed task to the slack message.
The following is an example I want to achieve:
http://myserver-uw1.myaws.com:8080/admin/airflow/graph?execution_date=...&arrange=LR&root=&dag_id=MyDAG&_csrf_token=mytoken
The current message is:
def slack_failed_task(context):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#mychannel",
token="...",
text=':red_circle: Failure on: ' +
str(context['dag']) +
'\nRun ID: ' + str(context['run_id']) +
'\nTask: ' + str(context['task_instance']))
return failed_alert.execute(context=context)
To access your Apache Airflow UIOpen the Environments page on the Amazon MWAA console. Choose an environment. Choose Open Airflow UI.
A notable feature of Apache Airflow is the user interface (UI), which provides insights into your DAGs and DAG runs. The UI is a useful tool for understanding, monitoring, and troubleshooting your pipelines. This guide is an overview of some of the most useful features and visualizations in the Airflow UI.
Go to api.slack.com/apps and click on “Create New App” for your airflow instance to access. Upon exiting the modal, enable incoming webhooks for your slack workspace app. Then, scroll to the bottom and select “Add new webhook to workspace”.
You can build the url to the UI with the config value base_url
under the [webserver]
section and then use Slack's message format <http://example.com|stuff>
for links.
from airflow import configuration
def slack_failed_task(context):
link = '<{base_url}/admin/airflow/log?dag_id={dag_id}&task_id={task_id}&execution_date={execution_date}|logs>'.format(
base_url=configuration.get('webserver', 'BASE_URL'),
dag_id=context['dag'].dag_id,
task_id=context['task_instance'].task_id,
execution_date=context['ts'])) # equal to context['execution_date'].isoformat())
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#mychannel",
token="...",
text=':red_circle: Failure on: ' +
str(context['dag']) +
'\nRun ID: ' + str(context['run_id']) +
'\nTask: ' + str(context['task_instance']) +
'\nSee ' + link + ' to debug')
return failed_alert.execute(context=context)
We can also do this using the log_url
attribute in the Task Instance
def slack_failed_task(context):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#mychannel",
token="...",
text=':red_circle: Failure on: ' +
str(context['dag']) +
'\nRun ID: ' + str(context['run_id']) +
'\nTask: ' + str(context['task_instance']) +
'\nLogs: <{url}|to Airflow UI>'.format(url=context['task_instance'].log_url)
)
return failed_alert.execute(context=context)
I know this is available since version 1.10.4 at the very least.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With