Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow: how to delete a DAG?

Tags:

airflow

I have started the Airflow webserver and scheduled some dags. I can see the dags on web GUI.

How can I delete a particular DAG from being run and shown in web GUI? Is there an Airflow CLI command to do that?

I looked around but could not find an answer for a simple way of deleting a DAG once it has been loaded and scheduled.

like image 828
subba Avatar asked Nov 17 '16 09:11

subba


People also ask

What happens if you delete a DAG on Airflow?

@akki Deleting a DAG via the API or UI only removes the DAG's history from the database tables, not the DAG file itself, so it's better to delete your DAG's . py file first if your goal is to not have the DAG run again.

How do I delete an Airflow task?

Click on the "select all" checkbox at the top of the list to select all of the queued tasks. Now, in the "Actions" menu, select "Clear" and apply it to all of the queued tasks. Confirm your choice to Clear the queued tasks. Airflow should immediately prepare to run the queued tasks.


2 Answers

Edit 8/27/18 - Airflow 1.10 is now released on PyPI!

https://pypi.org/project/apache-airflow/1.10.0/


How to delete a DAG completely

We have this feature now in Airflow ≥ 1.10!

The PR #2199 (Jira: AIRFLOW-1002) adding DAG removal to Airflow has now been merged which allows fully deleting a DAG's entries from all of the related tables.

The core delete_dag(...) code is now part of the experimental API, and there are entrypoints available via the CLI and also via the REST API.

CLI:

airflow delete_dag my_dag_id 

REST API (running webserver locally):

curl -X "DELETE" http://127.0.0.1:8080/api/experimental/dags/my_dag_id 

Warning regarding the REST API: Ensure that your Airflow cluster uses authentication in production.

Installing / upgrading to Airflow 1.10 (current)

To upgrade, run either:

export SLUGIFY_USES_TEXT_UNIDECODE=yes 

or:

export AIRFLOW_GPL_UNIDECODE=yes 

Then:

pip install -U apache-airflow 

Remember to check UPDATING.md first for the full details!

like image 200
Taylor D. Edmiston Avatar answered Sep 29 '22 09:09

Taylor D. Edmiston


This is my adapted code using PostgresHook with the default connection_id.

import sys from airflow.hooks.postgres_hook import PostgresHook  dag_input = sys.argv[1] hook=PostgresHook( postgres_conn_id= "airflow_db")  for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:     sql="delete from {} where dag_id='{}'".format(t, dag_input)     hook.run(sql, True) 
like image 38
Jesus Carpintero Avatar answered Sep 29 '22 08:09

Jesus Carpintero