I'm in the UTC+4 timezone, so when Airflow triggers the nightly ETLs, it's already 4:00AM here. How can I tell Airflow to trigger the run for day ds already on day ds-1 at 20:00, but with ds=ds?
Per the docs it's highly recommended to keep all servers on UTC, so that's why I'm looking for an application-level solution.
EDIT: a hacky solution is to define it to run everyday at 20:00PM, so the "previous" day, but then use tomorrow_ds
instead of ds
in the job. But that still looks weird on the Airflow UI, because that's going to show the UTC execution time.
Schedule interval can also be a "cron expression" which means you can easily run it at 20:00 UTC. That coupled with "user_defined_filters" means you can, with a bit of trickery get the behaviour you want:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
import pytz
tz = pytz.timezone('Asia/Dubai')
def localize_utc_tz(d):
return tz.fromutc(d)
default_args = {
'start_date': datetime(2017, 11, 8),
}
dag = DAG(
'plus_4_utc',
default_args=default_args,
schedule_interval='0 20 * * *',
user_defined_filters={
'localtz': localize_utc_tz,
},
)
task = BashOperator(
task_id='task_for_testing_file_log_handler',
dag=dag,
bash_command='echo UTC {{ ts }}, Local {{ execution_date | localtz }} next {{ next_execution_date | localtz }}',
)
This outputs:
UTC 2017-11-08T20:00:00, Local 2017-11-09 00:00:00+04:00 next 2017-11-10 00:00:00+04:00
You'll have to be careful about the "types" of variables you use. For instance ds
and ts
are strings, not datetime objects which means the filter wont work on them
I had come up with same problem. I have daily, hourly, half-an hour jobs.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import pendulum
local_tz = pendulum.timezone("Asia/Calcutta")
args = {
'owner': 'ganesh',
'depends_on_past': False,
'start_date': datetime(2020, 3, 25, tzinfo=local_tz),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='test1',
default_args=args,
schedule_interval='30 00 * * *'
)
first_date = BashOperator(
task_id='first_date'
,
bash_command='date'
, dag=dag, env=None, output_encoding='utf-8')
second_date = BashOperator(
task_id='second_date'
,
bash_command='echo date'
, dag=dag, env=None, output_encoding='utf-8')
first_date >> second_date
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With