I am using airflow on an EC2 instance using the LocalScheduler
option. I've invoked airflow scheduler
and airflow webserver
and everything seems to be running fine. That said, after supplying the cron
string to schedule_interval
for "do this every 10 minutes," '*/10 * * * *'
, the job continue to execute every 24 hours by default. Here's the header of the code:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import workers
else:
print('Define PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
'max_active_runs': 1,
'concurrency': 4,
'schedule_interval': '*/10 * * * *' #..every 10 minutes
}
DAG = DAG(
dag_id='dash_update',
default_args=default_args
)
...
default_args is only meant to fill params passed to operators within a DAG. max_active_runs
, concurrency
, and schedule_interval
are all parameters for initializing your DAG, not operators. This is what you want:
DAG = DAG(
dag_id='dash_update',
start_date=datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
max_active_runs=1,
concurrency=4,
schedule_interval='*/10 * * * *', #..every 10 minutes
default_args=default_args,
)
I've mixed them up before as well, so for reference (note there are overlaps):
DAG parameters: https://airflow.incubator.apache.org/code.html?highlight=dag#airflow.models.DAG Operator parameters: https://airflow.incubator.apache.org/code.html#baseoperator
For airflow versions >2.1 you can use datetime.timedelta()
object:
DAG = DAG(
dag_id='dash_update',
start_date=datetime(2017, 9, 9, 10, 0, 0, 0),
max_active_runs=1,
concurrency=4,
schedule_interval=timedelta(minutes=10),
default_args=default_args,
)
Another cool feature for handling start_date is days_ago
from airflow.utils.dates import days_ago
DAG = DAG(
dag_id='dash_update',
start_date=days_ago(2, minute=15), # would start 2 days ago at 00:15
max_active_runs=1,
concurrency=4,
schedule_interval=timedelta(minutes=10),
default_args=default_args,
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With