Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache airflow: setting catchup to False is not working

I have a DAG created on Apache airflow. It seems scheduler is configured to run it from June on 2015 (By the way. I do not know why, but it is a new DAG y created and I didn't backfill it, I only backfilled other dag with different DAG ID with these date intervals, and the scheduler took those dates and bakfilled my new dag. I'm starting to work with airflow).

(Update: I realized DAG is backfilled because start date is set on the DAG default config, although this does not explain the behaviour I expose below)

I'm trying to stop the scheduler to run all the DAG executions from that date. airflow backfill --mark_success tutorial2 -s '2015-06-01' -e '2019-02-27' command is giving me database errors (see below), so I'm trying to set catchup to False.

sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: job [SQL: 'INSERT INTO job (dag_id, state, job_type, start_date, end_date, latest_heartbeat, executor_class, hostname, unixname) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)'] [parameters: ('tutorial2', 'running', 'BackfillJob', '2019-02-27 10:52:37.281716', None, '2019-02-27 10:52:37.281733', 'SequentialExecutor', '08b6eb432df9', 'airflow')] (Background on this error at: http://sqlalche.me/e/e3q8)

So I'm using another approach. What I've tried:

  1. Setting catchup_by_default = False in airflow.cfg and restarting the whole docker container.
  2. Setting catchup = False on my pyhton DAG file and launching the file with python again.

What I'm seeing on the web UI:

DAG's executions are being launched starting at June 2015: DAG's executions are being launched starting at June 2015. Catchup is set to False on DAG's configuration:

Catchup is set to False on DAG's configuration So I don't understand why those DAG's executions are being launched.

Thank you

DAG code:

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'catchup' : False,
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'tutorial2', default_args=default_args, schedule_interval='* * * * *')

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)
like image 913
dhalfageme Avatar asked Feb 27 '19 09:02

dhalfageme


People also ask

What is catchup in Airflow DAG?

Catchup. An Airflow DAG defined with a start_date , possibly an end_date , and a non-dataset schedule, defines a series of intervals which the scheduler turns into individual DAG runs and executes.

How do you rerun a failed task in Airflow?

To rerun a task in Airflow you clear the task status to update the max_tries and current task instance state values in the metastore. After the task reruns, the max_tries value updates to 0 , and the current task instance state updates to None .

How do you backfill data Airflow?

Backfilling can be accomplished in Airflow using the CLI. You simply specify the DAG ID, as well as the start date and end date for the backfill period. This command runs the DAG for all intervals between the start date and end date. DAGs in your backfill interval are still rerun even if they already have DAG runs.

What is retry delay in Airflow?

retries (int) – the number of retries that should be performed before failing the task. retry_delay (datetime.timedelta) – delay between retries. retry_exponential_backoff (bool) – allow progressive longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)


1 Answers

I think you actually need to specify the catchup at the dag level, not pass it in through default_args. (The latter doesn't really make sense anyway, since those are the default args for the tasks. You couldn't have some tasks catch up and others not.)

Try this:

dag = DAG(
    'tutorial2', default_args=default_args, schedule_interval='* * * * *', catchup=False)
like image 77
santon Avatar answered Oct 19 '22 22:10

santon