Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow: Get status of a task from an external DAG

Tags:

airflow

I want to get the status of a task from an external DAG. I have the same tasks running in 2 different DAGs based on some conditions. So, I want to check the status of this task in DAG2 from DAG1. If the task status is 'running' in DAG2, then I will skip this task in DAG1.

I tried using:

dag_runs = DagRun.find(dag_id=dag_id,execution_date=exec_dt)
for dag_run in dag_runs:
  dag_run.state

I couldn't figure out if we can get task status using DagRun. If I use TaskDependencySensor, the DAG will have to wait until it finds the allowed_states of the task.

Is there a way to get the current status of a task in another DAG?

like image 798
sks Avatar asked Sep 19 '25 11:09

sks


1 Answers

I used below code to get the status of a task from another DAG:

from airflow.api.common.experimental.get_task_instance import get_task_instance

def get_dag_state(execution_date, **kwargs):
    ti = get_task_instance('dag_id', 'task_id', execution_date)
    task_status = ti.current_state()
    return task_status

dag_status = BranchPythonOperator(
    task_id='dag_status',
    python_callable=get_dag_state,
    dag=dag
    )

More details can be found here

like image 175
sks Avatar answered Sep 21 '25 09:09

sks