I am using Airflow to schedule batch jobs. I have one DAG (A) that runs every night and another DAG (B) that runs once per month. B depends on A having completed successfully. However B takes a long time to run and so I would like to keep it in a separate DAG to allow better SLA reporting.
How can I make running DAG B dependent on a successful run of DAG A on the same day?
This post has shown how to create those dependencies even if you don't control the upstream DAGs: add a new DAG that relies on using the ExternalTaskSensor (one sensor per upstream DAG), encode the dependencies between the DAGs as dependencies between the sensor tasks, run the DAG encoding the dependencies in the same ...
The 'G' in DAG is 'Graph'. A Graph is a collection of nodes and edges that connect the nodes. For our purposes, each node would be a task, and each edge would be a dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a direction associated with it.
According to the official Airflow docs, The task instances directly upstream from the task need to be in a success state. Also, if you have set depends_on_past=True, the previous task instance needs to have succeeded (except if it is the first run for that task).
You can achieve this behavior using an operator called ExternalTaskSensor. Your task (B1) in DAG(B) will be scheduled and wait for a success on task (A2) in DAG(A)
External Task Sensor documentation
It looks like a TriggerDagRunOperator can be used as well, and you can use a python callable to add some logic. As explained here : https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand
When cross-DAG dependency is needed, there are often two requirements:
Task B1
on DAG B
needs to run after task A1
on DAG A
is done. This can be achieved using ExternalTaskSensor
as others have mentioned:
B1 = ExternalTaskSensor(task_id="B1",
external_dag_id='A',
external_task_id='A1',
mode="reschedule")
When user clears task A1
on DAG A
, we want Airflow to clear task B1
on DAG B
to let it re-run. This can be achieved using ExternalTaskMarker
(since Airflow v1.10.8).
A1 = ExternalTaskMarker(task_id="A1",
external_dag_id="B",
external_task_id="B1")
Please see the doc about cross-DAG dependencies for more details: https://airflow.apache.org/docs/stable/howto/operator/external.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With