Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to trigger daily DAG run at midnight local time instead of midnight UTC time

I'm in the UTC+4 timezone, so when Airflow triggers the nightly ETLs, it's already 4:00AM here. How can I tell Airflow to trigger the run for day ds already on day ds-1 at 20:00, but with ds=ds?

Per the docs it's highly recommended to keep all servers on UTC, so that's why I'm looking for an application-level solution.

EDIT: a hacky solution is to define it to run everyday at 20:00PM, so the "previous" day, but then use tomorrow_ds instead of ds in the job. But that still looks weird on the Airflow UI, because that's going to show the UTC execution time.

like image 927
Marton Trencseni Avatar asked Nov 04 '17 15:11

Marton Trencseni


2 Answers

Schedule interval can also be a "cron expression" which means you can easily run it at 20:00 UTC. That coupled with "user_defined_filters" means you can, with a bit of trickery get the behaviour you want:

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

import pytz
tz = pytz.timezone('Asia/Dubai')


def localize_utc_tz(d):
    return tz.fromutc(d)

default_args = {
    'start_date': datetime(2017, 11, 8),
}
dag = DAG(
    'plus_4_utc',
    default_args=default_args,
    schedule_interval='0 20 * * *',
    user_defined_filters={
        'localtz': localize_utc_tz,
    },
)
task = BashOperator(
        task_id='task_for_testing_file_log_handler',
        dag=dag,
        bash_command='echo UTC {{ ts }}, Local {{ execution_date | localtz }} next {{ next_execution_date | localtz }}',
)

This outputs:

UTC 2017-11-08T20:00:00, Local 2017-11-09 00:00:00+04:00 next 2017-11-10 00:00:00+04:00

You'll have to be careful about the "types" of variables you use. For instance ds and ts are strings, not datetime objects which means the filter wont work on them

like image 76
Ash Berlin-Taylor Avatar answered Nov 10 '22 07:11

Ash Berlin-Taylor


I had come up with same problem. I have daily, hourly, half-an hour jobs.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import pendulum

local_tz = pendulum.timezone("Asia/Calcutta")

args = {
    'owner': 'ganesh',
    'depends_on_past': False,
    'start_date': datetime(2020, 3, 25, tzinfo=local_tz),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='test1',
    default_args=args,
    schedule_interval='30 00 * * *'
    )

first_date = BashOperator(
    task_id='first_date'
    ,
    bash_command='date'
    , dag=dag, env=None, output_encoding='utf-8')

second_date = BashOperator(
    task_id='second_date'
    ,
    bash_command='echo date'
    , dag=dag, env=None, output_encoding='utf-8')

first_date >> second_date



like image 38
Ganesh Avatar answered Nov 10 '22 07:11

Ganesh