Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Force Airflow's backfill command to run sequentially

Is there any way that I can run backfill sequentially without doing multitasking? E.g., if I run the backfill with several dates such as airflow backfill [dag] -s "2017-07-01" -e "2017-07-10", is there any way to finish every dag before running to the next day? Right now its finishing all days of each task before going to the next task.

Thanks.

like image 551
Alberto C. Avatar asked Jul 25 '17 08:07

Alberto C.


People also ask

Which component of Airflow initiates the DAG run?

The scheduler, by default, will kick off a DAG Run for any data interval that has not been run since the last data interval (or has been cleared).

Where does Airflow look for DAGs?

Airflow looks in your DAGS_FOLDER for modules that contain DAG objects in their global namespace and adds the objects it finds in the DagBag .

What is execution date in Airflow?

The execution time in Airflow is not the actual run time, but rather the start timestamp of its schedule period. For example, the execution time of the first DAG run is 2019–12–05 7:00:00, though it is executed on 2019–12–06.


1 Answers

You can set the max_active_runs parameter of your DAG to 1 which will make sure that only one DAG run for that dag will get scheduled at the same time. https://pythonhosted.org/airflow/code.html?highlight=concurrency#models

If you need your entire dag to be complete before moving forward you can add an ExternalTaskSensor to the start of your DAG and a DummyOperator collection task at the end. Then set the ExternalTaskSensor to trigger on the DummyOperator at the end of the previous run.

dag = DAG(dag_id='dag')

wait_for_previous_operator = ExternalTaskSensor(\
        task_id='wait_for_previous',
        external_dag_id='dag',
        external_task_id='collection',
        execution_delta=schedule_interval,
        dag=dag)

collection_operator = DummyOperator(\
        task_id='collection',
        dag=dag)


wait_for_previous_operator.set_downstream(your_other_tasks_list)
collection_operator.set_upstream(your_other_tasks_list)
like image 163
Matthijs Brouns Avatar answered Sep 28 '22 04:09

Matthijs Brouns