I have a DAG created on Apache airflow. It seems scheduler is configured to run it from June on 2015 (By the way. I do not know why, but it is a new DAG y created and I didn't backfill it, I only backfilled other dag with different DAG ID with these date intervals, and the scheduler took those dates and bakfilled my new dag. I'm starting to work with airflow).
(Update: I realized DAG is backfilled because start date is set on the DAG default config, although this does not explain the behaviour I expose below)
I'm trying to stop the scheduler to run all the DAG executions from that date. airflow backfill --mark_success tutorial2 -s '2015-06-01' -e '2019-02-27'
command is giving me database errors (see below), so I'm trying to set catchup to False.
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: job [SQL: 'INSERT INTO job (dag_id, state, job_type, start_date, end_date, latest_heartbeat, executor_class, hostname, unixname) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)'] [parameters: ('tutorial2', 'running', 'BackfillJob', '2019-02-27 10:52:37.281716', None, '2019-02-27 10:52:37.281733', 'SequentialExecutor', '08b6eb432df9', 'airflow')] (Background on this error at: http://sqlalche.me/e/e3q8)
So I'm using another approach. What I've tried:
What I'm seeing on the web UI:
DAG's executions are being launched starting at June 2015: Catchup is set to False on DAG's configuration:
So I don't understand why those DAG's executions are being launched.
Thank you
DAG code:
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'catchup' : False,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'tutorial2', default_args=default_args, schedule_interval='* * * * *')
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
Catchup. An Airflow DAG defined with a start_date , possibly an end_date , and a non-dataset schedule, defines a series of intervals which the scheduler turns into individual DAG runs and executes.
To rerun a task in Airflow you clear the task status to update the max_tries and current task instance state values in the metastore. After the task reruns, the max_tries value updates to 0 , and the current task instance state updates to None .
Backfilling can be accomplished in Airflow using the CLI. You simply specify the DAG ID, as well as the start date and end date for the backfill period. This command runs the DAG for all intervals between the start date and end date. DAGs in your backfill interval are still rerun even if they already have DAG runs.
retries (int) – the number of retries that should be performed before failing the task. retry_delay (datetime.timedelta) – delay between retries. retry_exponential_backoff (bool) – allow progressive longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)
I think you actually need to specify the catchup at the dag
level, not pass it in through default_args
. (The latter doesn't really make sense anyway, since those are the default args for the tasks. You couldn't have some tasks catch up and others not.)
Try this:
dag = DAG(
'tutorial2', default_args=default_args, schedule_interval='* * * * *', catchup=False)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With