Right now, nodes in my DAG proceeds to the next day's task before the rest of the nodes of that DAG finishes. Is there a way for it to wait for the rest of the DAG to finish before moving unto the next day's DAG cycle?
(I do have depends_on_past as true, but that does not work in this case)
My DAG looks like this:
O
l
V
O -> O -> O -> O -> O
Also, tree view pic of the dag]
The Airflow scheduler monitors all tasks and all DAGs, and triggers the task instances whose dependencies have been met. Behind the scenes, it monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) inspects active tasks to see whether they can be triggered.
You should just run airflow scheduler without a num_runs param. The scheduler is designed to be a long running process, an infinite loop. It orchestrates the work that is being done, it is the heart of airflow.
The scheduler, by default, will kick off a DAG Run for any data interval that has not been run since the last data interval (or has been cleared). This concept is called Catchup.
CLI Check for Scheduler BaseJob with information about the host and timestamp (heartbeat) at startup, and then updates it regularly. You can use this to check if the scheduler is working correctly. To do this, you can use the airflow jobs checks command. On failure, the command will exit with a non-zero error code.
Might be a bit late for this answer, but I ran into the same issue and the way I resolved it is I added two extra tasks in each dag. "Previous" at the start and "Complete" at the end. Previous task is external task sensor which monitors previous job. Complete is just a dummy operator. Lets say it runs every 30 minutes so the dag would look like this:
dag = DAG(dag_id='TEST_DAG', default_args=default_args, schedule_interval=timedelta(minutes=30))
PREVIOUS = ExternalTaskSensor(
task_id='Previous_Run',
external_dag_id='TEST_DAG',
external_task_id='All_Tasks_Completed',
allowed_states=['success'],
execution_delta=timedelta(minutes=30),
dag=DAG
)
T1 = BashOperator(
task_id='TASK_01',
bash_command='echo "Hello World from Task 1"',
dag=dag
)
COMPLETE = DummyOperator(
task_id='All_Tasks_Completed',
dag=DAG
)
PREVIOUS >> T1 >> COMPLETE
So the next dag, even tho it will come into the queue, it will not let tasks run until PREVIOUS is completed.
What ended up working for me is a combination of
if you want to just run one instance at a time then try setting max_active_runs=1
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With