Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Airflow scheduler does not trigger DAG at schedule time

When I schedule DAGs to run at a specific time everyday, the DAG execution does not take place at all. However, when I restart Airflow webserver and scheduler, the DAGs execute once on the scheduled time for that particular day and do not execute from the next day onwards. I am using Airflow version v1.7.1.3 with python 2.7.6. Here goes the DAG code:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

import time
n=time.strftime("%Y,%m,%d")
v=datetime.strptime(n,"%Y,%m,%d")
default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': v,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=10),

}

dag = DAG('dag_user_answer_attempts', default_args=default_args, schedule_interval='03 02 * * *')

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='user_answer_attempts',
    bash_command='python /home/ubuntu/bigcrons/appengine-flask-skeleton-master/useranswerattemptsgen.py',
    dag=dag)

Am I doing something wrong?

like image 795
Prabhjot Avatar asked Nov 21 '16 06:11

Prabhjot


People also ask

How do you trigger Airflow DAG automatically?

Airflow triggers the DAG automatically based on the specified scheduling parameters. Trigger manually. You can trigger a DAG manually from the Airflow UI, or by running an Airflow CLI command from gcloud .

How often does Airflow check for new DAGs?

Airflow scans the dags_folder for new DAGs every dag_dir_list_interval , which defaults to 5 minutes but can be modified. You might have to wait until this interval has passed before a new DAG appears in the UI.

What is schedule interval in Airflow?

As Airflow has its scheduler and it adopts the schedule interval syntax from cron, the smallest data and time interval in the Airflow scheduler world is minute. Inside of the scheduler, the only thing that is continuously running is the scheduler itself.


2 Answers

Your issue is the start_date being set for the current time. Airflow runs jobs at the end of an interval, not the beginning. This means that the first run of your job is going to be after the first interval.

Example:

You make a dag and put it live in Airflow at midnight. Today (20XX-01-01 00:00:00) is also the start_date, but it is hard-coded ("start_date":datetime(20XX,1,1)). The schedule interval is daily, like yours (3 2 * * *).

The first time this dag will be queued for execution is 20XX-01-02 02:03:00, because that is when the interval period ends. If you look at your dag being run at that time, it should have a started datetime of roughly one day after the schedule_date.

You can solve this by having your start_date hard-coded to a date or by making sure that the dynamic date is further in the past than the interval between executions (In your case, 2 days would be plenty). Airflow recommends you use static start_dates in case you need to re-run jobs or backfill (or end a dag).

For more information on backfilling (the opposite side of this common stackoverflow question), check the docs or this question: Airflow not scheduling Correctly Python

like image 173
apathyman Avatar answered Oct 18 '22 17:10

apathyman


Check the following:

  1. start_date is a fix time in the past(don't use datetime.now())
  2. if you don't want to run the historical data, use catchup=false
  3. to set a specific time for DAG to run (e.g hourly, monthly, daily, at a specific time), try using https://crontab.guru/#40_21_*_*_* to write what you need.
  4. If you think you have 1, 2, 3 steps all correct but the DAG is not running. Or the DAG can run every xx minutes, but failed to trigger even once in a daily interval, try create a new python file, copy your DAG code there, rename it so that the file is unique and then test again. It could be the case that airflow scheduler got confused by the inconsistency between previous DAG Runs' metadata and the current schedule.

Hope this helped!

like image 37
Yingying Avatar answered Oct 18 '22 17:10

Yingying