in airflow, I would like to run a dag each monday at 8am (the execution_date should be of course "current day monday 8 am"). The relevant parameters to set up for this workflow are :
start_date
: "2018-03-19"schedule_interval
: "0 8 * * MON"I expect to see a dag run every monday at 8am . The first one being run the 19-03-2018 at 8 am with execution_date
= 2018-03-19-08-00-00 and so on each monday.
However it's not what happens : the dag is not started on 19/03/18 at 8 am. The real behaviour is explained here for exemple : https://stackoverflow.com/a/39620901/1510109 or https://stackoverflow.com/a/48213964/1510109 The behaviour is : at each end of the interval ( weekly in my case) the dag is run with execution_date = beginning of the interval (i.e the previous week). This behavour is apparently motivated by an "ETL way of thinking" (see the link above). But it's absolutely not what I want.
How what can I achieve to run my dag each monday at 08:00am with execution_date
= trigger_date
= now ( = current monday 8am) ?
Thanks
Take a quick look at my answer with start times and execution_date examples.
You want to run every Monday at 8am.
So this part is going to stay the same:
schedule_interval: '0 8 * * MON',
You want it to run it's first run on 2018-03-19, since the first run occurs at the end of the first full schedule period after the start date, you should change your start date to:
start_date: datetime(2018,03,12),
You will have to live with the fact that Airflow will name your DagRuns with the start of each period and pass in macros based on the execution_date
set to the start of the interval period. Adjust your logic accordingly.
Your first run will start after 2018-03-19T08:00:00.0Z
and the execution_date
, every other macro that depends on it, and name of the DagRun will be 2018-03-12T08:00:00.0Z
So long as you understand what to expect from the execution_date
and you don't try to base your time off of datetime.now()
your DAGs will be able to be idempotent in operation. Feel free to make a new variable like my_execution_date = execution_date + datetime.timedelta(7)
within any PythonOperator
or custom operator (you get execution_date from the context of the task), use template statements like {{ (execution_date + macros.timedelta(7)).strftime('%Y%m%d') }}
or {{ macros.ds_add(ds, 7) }}
, or use the next_execution_date
.
You can even add a dag level user_defined_macros
like {'dt':lambda d: d+datetime.timedelta(days=7)}
to enable {{ dt(execution_date) }}
. And recently user_defined_filters
were added like {'dt':lambda d: d+datetime.timedelta(days=7)}
enabling {{ execution_date | dt }}
. The next_ds
and next_execution_date
would be easier for your purposes.
While thinking about templating, you may as well read up on the built-in stuff out there: http://jinja.pocoo.org/docs/2.10/templates/#builtin-filters
That is how airflow behaves, it always runs when the duration is completed. Detailed behavior here and airflow faq.
But in order to somehow make it run for current week, what we can do is manipulate execution_date
of DAG. That may be in form of adding 7 days to a datetime
object (if weekly schedule) or may use {{ next_execution_date }}
macro.
Agreed that this is only possible if somehow dates are used in your DAG or dependencies are triggered by it.
Just to be clear again, DAG is still running as per its normal behavior. Only thing what we trying to do is manipulate date
in program/DAG.
args = { ....
'start_date': datetime.datetime(2018,3,18)
}
dag = DAG(...
schedule_interval = "@weekly"
)
# DAG would run on 3/25/2018 for week of 18th March
# but lets say we manipulate here
# {{ next_execution_date }} macro
# or add 7 days
# So basically we are running with date 3/25/2018 instead of 3/18/2018 for the week of 18th March
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With