I want to get the status of a task from an external DAG. I have the same tasks running in 2 different DAGs based on some conditions. So, I want to check the status of this task in DAG2 from DAG1. If the task status is 'running' in DAG2, then I will skip this task in DAG1.
I tried using:
dag_runs = DagRun.find(dag_id=dag_id,execution_date=exec_dt)
for dag_run in dag_runs:
dag_run.state
I couldn't figure out if we can get task status using DagRun. If I use TaskDependencySensor, the DAG will have to wait until it finds the allowed_states of the task.
Is there a way to get the current status of a task in another DAG?
I used below code to get the status of a task from another DAG:
from airflow.api.common.experimental.get_task_instance import get_task_instance
def get_dag_state(execution_date, **kwargs):
ti = get_task_instance('dag_id', 'task_id', execution_date)
task_status = ti.current_state()
return task_status
dag_status = BranchPythonOperator(
task_id='dag_status',
python_callable=get_dag_state,
dag=dag
)
More details can be found here
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With