Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to work correctly airflow schedule_interval

Tags:

airflow

I want to try to use Airflow instead of Cron. But schedule_interval doesn't work as I expected.

I wrote the python code like below.
And in my understanding, Airflow should have ran on "2016/03/30 8:15:00" but it didn't work at that time.

If I changed it like this "'schedule_interval': timedelta(minutes = 5)", it worked correctly, I think.

The "notice_slack.sh" is just to call slack api to my channels.

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 3, 29, 8, 15),
}

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval="@daily",
    dagrun_timeout=timedelta(minutes=1))

# cmd file name
CMD = '/tmp/notice_slack.sh'

run_this = BashOperator(
    task_id='run_transport', bash_command=CMD, dag=dag)

I want to run some of my scripts at specific time every day like this cron setting.

15 08 * * * bash /tmp/notice_slack.sh

I have read the document Scheduling & Triggers, and I know it's a little bit different cron.
So I attempt to arrange at "start_date" and "schedule_interval" settings.

Does anyone know what should I do ?

airflow version

INFO - Using executor LocalExecutor

v1.7.0

amazon-linux-ami/2015.09-release-notes

like image 486
k16 Avatar asked Mar 30 '16 02:03

k16


People also ask

How do I run an Airflow job?

To start the airflow job scheduler you need to execute the Airflow Scheduler command. It will use the configuration specified in airflow. cfg. The Airflow Jobs Scheduler runs jobs with schedule_interval AFTER the start date, at the END of the period.

How do you run a DAG daily?

rather than at a specific time, you can pass a timedelta object to the schedule interval. For example, schedule_interval=timedelta(minutes=30) will run the DAG every thirty minutes, and schedule_interval=timedelta(days=1) will run the DAG every day.


1 Answers

Try this:

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 3, 29),
}

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval="15 08 * * *",
    dagrun_timeout=timedelta(minutes=1))

# cmd file name
CMD = 'bash /tmp/notice_slack.sh'

run_this = BashOperator(
    task_id='run_transport', bash_command=CMD, dag=dag)

start_date (datetime) – The start_date for the task, determines the execution_date for the first task instance. The best practice is to have the start_date rounded to your DAG’s schedule_interval.

schedule_interval (datetime.timedelta or dateutil.relativedelta.relativedelta or str that acts as a cron expression) – Defines how often that DAG runs, this timedelta object gets added to your latest task instance’s execution_date to figure out the next schedule.

Simply configuring the schedule_interval and bash_command as the same in your cron setting is okay.

like image 89
ansvver Avatar answered Sep 22 '22 13:09

ansvver