Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow: what's the standard way of delaying a day's DAG run?

I've got a DAG that's scheduled to run daily. In most scenarios, the scheduler would trigger this job as soon as the execution_date is complete, i.e., the next day. However, due to upstream delays, I only want to kick off the dag run for the execution_date three days after execution_date. In other words, I want to introduce a three day lag.

From the research I've done, one route would be to add a TimeDeltaSensor at the beginning of my dag run with delta=datetime.timedelta(days=3).

However, due to the way the Airflow scheduler is implemented, that's problematic. Under this approach, each of my DAG runs will be active for over three days. My DAG has lots of tasks, and if several DAG runs are active, I've noticed that the scheduler eats up lots of CPU because it's constantly iteration over all these tasks (even inactive tasks). So is there another way to just tell the scheduler to not kick off the DAG run until three days have passed?

like image 308
conradlee Avatar asked Mar 15 '18 08:03

conradlee


People also ask

How do you pause a DAG Airflow?

From the DAGs view you can: Pause/unpause a DAG with the toggle to the left of the DAG name. Filter the list of DAGs to show active, paused, or all DAGs.

What is schedule interval in Airflow?

As Airflow has its scheduler and it adopts the schedule interval syntax from cron, the smallest data and time interval in the Airflow scheduler world is minute. Inside of the scheduler, the only thing that is continuously running is the scheduler itself.

What is wait for downstream in Airflow?

wait_for_downstream (bool) – when set to true, an instance of task X will wait for tasks immediately downstream of the previous instance of task X to finish successfully before it runs. This is useful if the different instances of a task X alter the same asset, and this asset is used by tasks downstream of task X.


1 Answers

It might be easier to manipulate the date variable within the DAG.

I am assuming you would be using the execution date ds in your task instances in some way, like querying data for the given day.

In this case you could use the built in macros to manipulate the date like macros.ds_add(ds, -3) to simply adjust the date to minus 3 days.

You can use it in a template field as usual '{{ macros.ds_add(ds, -3) }}'

Macro docs here

like image 154
Blakey Avatar answered Oct 18 '22 03:10

Blakey