Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Running an Airflow DAG every X minutes

I am using airflow on an EC2 instance using the LocalScheduler option. I've invoked airflow scheduler and airflow webserver and everything seems to be running fine. That said, after supplying the cron string to schedule_interval for "do this every 10 minutes," '*/10 * * * *', the job continue to execute every 24 hours by default. Here's the header of the code:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('PREPROC_PATH')

if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import workers
else:
    print('Define PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
  'max_active_runs': 1,
  'concurrency': 4,
  'schedule_interval': '*/10 * * * *' #..every 10 minutes
}

DAG = DAG(
  dag_id='dash_update',
  default_args=default_args
)

...
like image 861
aaron Avatar asked Sep 12 '17 18:09

aaron


2 Answers

default_args is only meant to fill params passed to operators within a DAG. max_active_runs, concurrency, and schedule_interval are all parameters for initializing your DAG, not operators. This is what you want:

DAG = DAG(
  dag_id='dash_update',
  start_date=datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
  max_active_runs=1,
  concurrency=4,
  schedule_interval='*/10 * * * *', #..every 10 minutes
  default_args=default_args,
)

I've mixed them up before as well, so for reference (note there are overlaps):

DAG parameters: https://airflow.incubator.apache.org/code.html?highlight=dag#airflow.models.DAG Operator parameters: https://airflow.incubator.apache.org/code.html#baseoperator

like image 32
Daniel Huang Avatar answered Sep 23 '22 23:09

Daniel Huang


For airflow versions >2.1 you can use datetime.timedelta() object:

DAG = DAG(
  dag_id='dash_update',
  start_date=datetime(2017, 9, 9, 10, 0, 0, 0),
  max_active_runs=1,
  concurrency=4,
  schedule_interval=timedelta(minutes=10),
  default_args=default_args,
)

Another cool feature for handling start_date is days_ago

from airflow.utils.dates import days_ago

DAG = DAG(
  dag_id='dash_update',
  start_date=days_ago(2, minute=15), # would start 2 days ago at 00:15
  max_active_runs=1,
  concurrency=4,
  schedule_interval=timedelta(minutes=10),
  default_args=default_args,
)
like image 81
dl.meteo Avatar answered Sep 22 '22 23:09

dl.meteo