I want to delete a DAG from the Airflow UI, that's not longer available in the GCS/dags
folder.
I know that Airflow has a "new" way to remove dags from the DB using
airflow delete_dag my_dag_id
command, seen in https://stackoverflow.com/a/49683543/5318634
It seems that in composer airflow version the delete_dag
command is not yet supported.
Do not try this: I've also tried using airflow resetdb
and the airflow UI died
Is there a way to delete the dags that are not currently in the gs://BUCKET/dags/
folder ?
I created a DAG to cleanup UI. It reads afDagID
from Airflow Variables
from airflow import DAG
from airflow import models
from airflow.operators.mysql_operator import MySqlOperator
import logging
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator
dag = DAG('ManageAirFlow', description='Deletes Airflow DAGs from backend: Uses vars- afDagID',
schedule_interval=None,
start_date=datetime(2018, 3, 20), catchup=False)
DeleteXComOperator = MySqlOperator(
task_id='delete-xcom-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from xcom where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)
DeleteTaskOperator = MySqlOperator(
task_id='delete-task-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from task_instance where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)
DeleteSLAMissOperator = MySqlOperator(
task_id='delete-sla-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from sla_miss where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)
DeleteLogOperator = MySqlOperator(
task_id='delete-log-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from log where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)
DeleteJobOperator = MySqlOperator(
task_id='delete-job-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from job where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)
DeleteDagRunOperator = MySqlOperator(
task_id='delete-dag_run-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from dag_run where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)
DeleteDagOperator = MySqlOperator(
task_id='delete-dag-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from dag where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)
DeleteXComOperator >> DeleteTaskOperator >> DeleteSLAMissOperator >> DeleteLogOperator >> DeleteJobOperator >> DeleteDagRunOperator >> DeleteDagOperator
As cloud composer is using the latest stable version i.e. 1.9.0
, the feature to delete dag is not available.
However,
There are few instructions in the docs to delete a dag as below:
gcloud beta composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_NAME.py
but unfortunately, this would not remove the DAG from the Airflow web interface.
More info: https://cloud.google.com/composer/docs/how-to/using/managing-dags#deleting_a_dag
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With