I have a huge json file in the XCOM which later I do not need once the dag execution is finished, but I still see the Xcom Object in the UI with all the data, Is there any way to delete the XCOM programmatically once the DAG run is finished.
Thank you
You can verify your query before you run the dag. Data Profiling -> Ad Hoc Query -> airflow_db -> query -> Run! Show activity on this post. Below is the code that worked for me,this will delete xcom of all tasks in DAG(Add task_id to SQL if xcom of only specific task needs to be deleted):
XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines. An XCom is identified by a key (essentially its name), as well as the task_id and dag_id it came from.
You can perform the cleanup programmatically through sqlalchemy so your solution won't break if the database structure changes:
from airflow.utils.db import provide_session
from airflow.models import XCom
@provide_session
def cleanup_xcom(session=None):
session.query(XCom).filter(XCom.dag_id == "your dag id").delete()
You can also purge old XCom data:
from airflow.utils.db import provide_session
from airflow.models import XCom
from sqlalchemy import func
@provide_session
def cleanup_xcom(session=None):
session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()
If you want to purge the XCom once the dag is finished I think the cleanest solution is to use the "on_success_callback" property of the DAG model class:
from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
@provide_session
def cleanup_xcom(context, session=None):
dag_id = context["ti"]["dag_id"]
session.query(XCom).filter(XCom.dag_id == dag_id).delete()
dag = DAG( ...
on_success_callback=cleanup_xcom,
)
You have to add a task depends on you metadatadb (sqllite, PostgreSql, MySql..) that delete XCOM once the DAG run is finished.
delete_xcom_task = PostgresOperator(
task_id='delete-xcom-task',
postgres_conn_id='airflow_db',
sql="delete from xcom where dag_id=dag.dag_id and
task_id='your_task_id' and execution_date={{ ds }}",
dag=dag)
You can verify your query before you run the dag.
Data Profiling -> Ad Hoc Query -> airflow_db -> query -> Run!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With