Code:
Python version 2.7.x and airflow version 1.5.1
my dag script is this
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['[email protected]'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
From that you could see that I am creating a DAG with 6 tasks the first task(Start1) starts first after which all the other five tasks starts
Currently I have given 5 minutes time delay between DAG's starting
It has ran perfectly for all the six tasks the first type but after five minutes the DAG is not re-initiated
It has been more then 1 hour still the DAG is not re-initiated I really don't know were I am wrong .
It would be really nice if some one could point me out what is wrong .I tried clearing using airflow testing clear
then to the same thing happen.It ran first instance then just stood there.
The only thing the command line shows is Getting all instance for DAG testing
When I change the position of the schedule_interval it just runs with out any schedule interval parallel.That is with in 5 minutes 300 or more task instance has been completed. There is no 5 minute schedule interval
Code 2:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
For Code 2, I guess the reason why it runs every minute is:
The start time is 2015-10-13 00:00
The schedule interval is 5 minutes
Every heartbeat of scheduler(5 seconds by default), your DAG will be checked
The solution is set the DAG start_date as datetime.now() - schedule_interval
.
And also if you want to debug:
Setting the LOGGINGLEVEL to debug
in settings.py
Modify class method is_queueable()
of airflow.models.TaskInstance
to
:
def is_queueable(self, flag_upstream_failed=False):
logging.debug('Checking whether task instance is queueable or not!')
if self.execution_date > datetime.now() - self.task.schedule_interval:
logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now()))
return False
...
Because the start time(2015-10-13 00:00) less than now time, it triggers the airflow backfill. It will run from 2015-10-13 00:00 when every seconds the airflow scheduler detected(its the Start Date), but Execution Date is between 5 min(task interval time).
See the log name:
$tree airflow/logs/testing/
testing/
|-- Orders10
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders11
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders12
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders13
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders14
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
-- Start1
|-- 2015-10-13T00:00:00
|-- 2015-10-13T00:05:00
|-- 2015-10-13T00:10:00
-- 2015-10-13T00:15:00
See the create time of logs:
$ll airflow/logs/testing/Start1
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:00:00
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:05:00
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:51 2015-10-13T00:10:00
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:52 2015-10-13T00:15:00
Also, you can see the Task Instances on web UI:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With