Is there a way in airflow of using the depends_on_past
for an entire DagRun, not just applied to a Task?
I have a daily DAG, and the Friday DagRun errored on the 4th task however the Saturday and Sunday DagRuns still ran as scheduled. Using depends_on_past = True
would have paused the DagRun on the same 4th task, however the first 3 tasks would still have run.
I can see in the DagRun DB table there is a state
column that contains failed
for the Friday DagRun. What I want is a way configuring a DagRun to not start if the previous DagRun failed, not start and run until finding a Task that previously failed.
Does anyone know if this is possible?
concurrency :** The maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to allow one DAG to run 32 tasks at once, and another DAG can be set to run 16 tasks at once.
According to the official Airflow docs, The task instances directly upstream from the task need to be in a success state. Also, if you have set depends_on_past=True, the previous task instance needs to have succeeded (except if it is the first run for that task).
Previously, we also recommended using rounded start_date in relation to your DAG's schedule . This meant an @hourly would be at 00:00 minutes:seconds, a @daily job at midnight, a @monthly job on the first of the month. This is no longer required.
If for some reason we want to re run DAGs on certain schedules manually we can use the following CLI command to do so. This will execute all DAG runs that were scheduled between START_DATE & END_DATE irrespective of the value of the catchup parameter in airflow. cfg.
This question is a bit old but it turns out as a first google search result and the highest rated answer is clearly misleading (and it has made me struggle a bit) so it definitely demands a proper answer. Although the second rated answer should work, there's a cleaner way to do this and I personally find using xcom ugly.
The Airflow has a special operator class designed for monitoring status of tasks from other dag runs or other dags as a whole. So what we need to do is to add a task preceding all the tasks in our dag, checking if the previous run has succeded.
from airflow.sensors.external_task_sensor import ExternalTaskSensor
previous_dag_run_sensor = ExternalTaskSensor(
task_id = 'previous_dag_run_sensor',
dag = our_dag,
external_dag_id = our_dag.dag_id,
execution_delta = our_dag.schedule_interval
)
previous_dag_run_sensor.set_downstream(vertices_of_indegree_zero_from_our_dag)
At your first task, set depends_on_past=True
and wait_for_downstream=True
, the combination will result in that current dag-run runs only if the last run succeeded.
Because by setting the first task at current dag-run would waits for previous (depends_on_past) and all tasks (wait_for_downstream) to succeed
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With