Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow depends_on_past for whole DAG

Is there a way in airflow of using the depends_on_past for an entire DagRun, not just applied to a Task?

I have a daily DAG, and the Friday DagRun errored on the 4th task however the Saturday and Sunday DagRuns still ran as scheduled. Using depends_on_past = True would have paused the DagRun on the same 4th task, however the first 3 tasks would still have run.

I can see in the DagRun DB table there is a state column that contains failed for the Friday DagRun. What I want is a way configuring a DagRun to not start if the previous DagRun failed, not start and run until finding a Task that previously failed.

Does anyone know if this is possible?

like image 305
chop4433 Avatar asked Nov 28 '17 14:11

chop4433


People also ask

How many DAGs can Airflow run at once?

concurrency :** The maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to allow one DAG to run 32 tasks at once, and another DAG can be set to run 16 tasks at once.

What is Depends_on_past in Airflow?

According to the official Airflow docs, The task instances directly upstream from the task need to be in a success state. Also, if you have set depends_on_past=True, the previous task instance needs to have succeeded (except if it is the first run for that task).

Is Start_date mandatory in Airflow DAG?

Previously, we also recommended using rounded start_date in relation to your DAG's schedule . This meant an @hourly would be at 00:00 minutes:seconds, a @daily job at midnight, a @monthly job on the first of the month. This is no longer required.

How do you backfill a DAG Airflow?

If for some reason we want to re run DAGs on certain schedules manually we can use the following CLI command to do so. This will execute all DAG runs that were scheduled between START_DATE & END_DATE irrespective of the value of the catchup parameter in airflow. cfg.


2 Answers

This question is a bit old but it turns out as a first google search result and the highest rated answer is clearly misleading (and it has made me struggle a bit) so it definitely demands a proper answer. Although the second rated answer should work, there's a cleaner way to do this and I personally find using xcom ugly.

The Airflow has a special operator class designed for monitoring status of tasks from other dag runs or other dags as a whole. So what we need to do is to add a task preceding all the tasks in our dag, checking if the previous run has succeded.

from airflow.sensors.external_task_sensor import ExternalTaskSensor


previous_dag_run_sensor = ExternalTaskSensor(
    task_id = 'previous_dag_run_sensor',
    dag = our_dag,
    external_dag_id = our_dag.dag_id,
    execution_delta = our_dag.schedule_interval
)

previous_dag_run_sensor.set_downstream(vertices_of_indegree_zero_from_our_dag)
like image 199
Ponewor Avatar answered Sep 18 '22 06:09

Ponewor


At your first task, set depends_on_past=True and wait_for_downstream=True, the combination will result in that current dag-run runs only if the last run succeeded.

Because by setting the first task at current dag-run would waits for previous (depends_on_past) and all tasks (wait_for_downstream) to succeed

like image 43
WeiChing 林煒清 Avatar answered Sep 21 '22 06:09

WeiChing 林煒清