Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible for Airflow scheduler to first finish the previous day's cycle before starting the next?

Right now, nodes in my DAG proceeds to the next day's task before the rest of the nodes of that DAG finishes. Is there a way for it to wait for the rest of the DAG to finish before moving unto the next day's DAG cycle?

(I do have depends_on_past as true, but that does not work in this case)

My DAG looks like this:

               O
               l
               V
O -> O -> O -> O -> O

Also, tree view pic of the dag]

tree view pic of the dag

like image 350
user3542930 Avatar asked Dec 07 '16 04:12

user3542930


People also ask

How does the Airflow scheduler work?

The Airflow scheduler monitors all tasks and all DAGs, and triggers the task instances whose dependencies have been met. Behind the scenes, it monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) inspects active tasks to see whether they can be triggered.

How do I keep my Airflow scheduler running?

You should just run airflow scheduler without a num_runs param. The scheduler is designed to be a long running process, an infinite loop. It orchestrates the work that is being done, it is the heart of airflow.

What is catchup in Airflow?

The scheduler, by default, will kick off a DAG Run for any data interval that has not been run since the last data interval (or has been cleared). This concept is called Catchup.

How do I know if the Airflow scheduler is running?

CLI Check for Scheduler BaseJob with information about the host and timestamp (heartbeat) at startup, and then updates it regularly. You can use this to check if the scheduler is working correctly. To do this, you can use the airflow jobs checks command. On failure, the command will exit with a non-zero error code.


3 Answers

Might be a bit late for this answer, but I ran into the same issue and the way I resolved it is I added two extra tasks in each dag. "Previous" at the start and "Complete" at the end. Previous task is external task sensor which monitors previous job. Complete is just a dummy operator. Lets say it runs every 30 minutes so the dag would look like this:

dag = DAG(dag_id='TEST_DAG', default_args=default_args, schedule_interval=timedelta(minutes=30))

PREVIOUS = ExternalTaskSensor(
    task_id='Previous_Run',
    external_dag_id='TEST_DAG',
    external_task_id='All_Tasks_Completed',
    allowed_states=['success'],
    execution_delta=timedelta(minutes=30),
    dag=DAG
)

T1 = BashOperator(
    task_id='TASK_01',
    bash_command='echo "Hello World from Task 1"',
    dag=dag
)

COMPLETE = DummyOperator(
    task_id='All_Tasks_Completed',
    dag=DAG
)

PREVIOUS >> T1 >> COMPLETE

So the next dag, even tho it will come into the queue, it will not let tasks run until PREVIOUS is completed.

like image 61
Oleg Yamin Avatar answered Oct 13 '22 18:10

Oleg Yamin


What ended up working for me is a combination of

  1. Adding task dependencies : wait_for_downstream=True, depends_on_past=True
  2. Adding max_active_runs:1 to while creating the dag. I did try to add max_active_runs as a default argument, but that did not work.
like image 24
user2006865 Avatar answered Oct 13 '22 17:10

user2006865


if you want to just run one instance at a time then try setting max_active_runs=1

like image 4
user7126545 Avatar answered Oct 13 '22 17:10

user7126545