Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set dependencies between DAGs in Airflow?

I am using Airflow to schedule batch jobs. I have one DAG (A) that runs every night and another DAG (B) that runs once per month. B depends on A having completed successfully. However B takes a long time to run and so I would like to keep it in a separate DAG to allow better SLA reporting.

How can I make running DAG B dependent on a successful run of DAG A on the same day?

like image 644
Conor Avatar asked Jun 24 '16 21:06

Conor


People also ask

How do you set dependency between Airflow DAGs?

This post has shown how to create those dependencies even if you don't control the upstream DAGs: add a new DAG that relies on using the ExternalTaskSensor (one sensor per upstream DAG), encode the dependencies between the DAGs as dependencies between the sensor tasks, run the DAG encoding the dependencies in the same ...

What is DAG dependency?

The 'G' in DAG is 'Graph'. A Graph is a collection of nodes and edges that connect the nodes. For our purposes, each node would be a task, and each edge would be a dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a direction associated with it.

What is depends on past in Airflow?

According to the official Airflow docs, The task instances directly upstream from the task need to be in a success state. Also, if you have set depends_on_past=True, the previous task instance needs to have succeeded (except if it is the first run for that task).


3 Answers

You can achieve this behavior using an operator called ExternalTaskSensor. Your task (B1) in DAG(B) will be scheduled and wait for a success on task (A2) in DAG(A)

External Task Sensor documentation

like image 98
p.magalhaes Avatar answered Sep 23 '22 06:09

p.magalhaes


It looks like a TriggerDagRunOperator can be used as well, and you can use a python callable to add some logic. As explained here : https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

like image 33
nono Avatar answered Sep 20 '22 06:09

nono


When cross-DAG dependency is needed, there are often two requirements:

  1. Task B1 on DAG B needs to run after task A1 on DAG A is done. This can be achieved using ExternalTaskSensor as others have mentioned:

    B1 = ExternalTaskSensor(task_id="B1",
                            external_dag_id='A',
                            external_task_id='A1',
                            mode="reschedule")
    
  2. When user clears task A1 on DAG A, we want Airflow to clear task B1 on DAG B to let it re-run. This can be achieved using ExternalTaskMarker (since Airflow v1.10.8).

    A1 = ExternalTaskMarker(task_id="A1", 
                            external_dag_id="B",
                            external_task_id="B1")
    

Please see the doc about cross-DAG dependencies for more details: https://airflow.apache.org/docs/stable/howto/operator/external.html

like image 36
doraemon Avatar answered Sep 23 '22 06:09

doraemon