Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow backfill clarification

Tags:

airflow

I'm just getting started with Airbnb's airflow, and I'm still not clear on how/when backfilling is done.

Specifically, there are 2 use-cases that confuse me:

  1. If I run airflow scheduler for a few minutes, stop it for a minute, then restart it again, my DAG seems to run extra tasks for the first 30 seconds or so, then it continues as normal (runs every 10 sec). Are these extra tasks "backfilled" tasks that weren't able to complete in an earlier run? If so, how would I tell airflow not to backfill those tasks?

  2. If I run airflow scheduler for a few minutes, then run airflow clear MY_tutorial, then restart airflow scheduler, it seems to run a TON of extra tasks. Are these tasks also somehow "backfilled" tasks? Or am I missing something.

Currently, I have a very simple dag:

default_args = {     'owner': 'me',     'depends_on_past': False,     'start_date': datetime(2016, 10, 4),     'email': ['[email protected]'],     'email_on_failure': False,     'email_on_retry': False,     'retries': 1,     'retry_delay': timedelta(minutes=5),     # 'queue': 'bash_queue',     # 'pool': 'backfill',     # 'priority_weight': 10,     # 'end_date': datetime(2016, 1, 1), }  dag = DAG(     'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10))  # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator(     task_id='print_date',     bash_command='date',     dag=dag)  t2 = BashOperator(     task_id='sleep',     bash_command='sleep 5',     retries=3,     dag=dag)  templated_command = """     {% for i in range(5) %}         echo "{{ ds }}"         echo "{{ macros.ds_add(ds, 8)}}"         echo "{{ params.my_param }}"     {% endfor %} """  t3 = BashOperator(     task_id='templated',     bash_command=templated_command,     params={'my_param': 'Parameter I passed in'},     dag=dag)  second_template = """     touch ~/airflow/logs/test     echo $(date) >> ~/airflow/logs/test """  t4 = BashOperator(     task_id='write_test',     bash_command=second_template,     dag=dag)  t1.set_upstream(t4) t2.set_upstream(t1) t3.set_upstream(t1) 

The only two things I've changed in my airflow config are

  1. I changed from using a sqlite db to using a postgres db
  2. I'm using a CeleryExecutor instead of a SequentialExecutor

Thanks so much for you help!

like image 218
diego_c Avatar asked Oct 05 '16 19:10

diego_c


People also ask

How do you backfill data Airflow?

Backfilling can be accomplished in Airflow using the CLI. You simply specify the DAG ID, as well as the start date and end date for the backfill period. This command runs the DAG for all intervals between the start date and end date. DAGs in your backfill interval are still rerun even if they already have DAG runs.

What is catch up in Airflow?

The scheduler, by default, will kick off a DAG Run for any data interval that has not been run since the last data interval (or has been cleared). This concept is called Catchup.

What is depends on past in Airflow?

According to the official Airflow docs, The task instances directly upstream from the task need to be in a success state. Also, if you have set depends_on_past=True, the previous task instance needs to have succeeded (except if it is the first run for that task).

What is an Airflow DAG?

DAGs. In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.


1 Answers

When you change the scheduler toggle to "on" for a DAG, the scheduler will trigger a backfill of all dag run instances for which it has no status recorded, starting with the start_date you specify in your "default_args".

For example: If the start date was "2017-01-21" and you turned on the scheduling toggle at "2017-01-22T00:00:00" and your dag was configured to run hourly, then the scheduler will backfill 24 dag runs and then start running on the scheduled interval.

This is essentially what is happening in both of your question. In #1, it is filling in the 3 missing runs from the 30 seconds which you turned off the scheduler. In #2, it is filling in all of the DAG runs from start_date until "now".

There are 2 ways around this:

  1. Set the start_date to a date in the future so that it will only start scheduling dag runs once that date is reached. Note that if you change the start_date of a DAG, you must change the name of the DAG as well due to the way the start date is stored in airflow's DB.
  2. Manually run backfill from the command line with the "-m" (--mark-success) flag which tells airflow not to actually run the DAG, rather just mark it as successful in the DB.

e.g.

airflow backfill MY_tutorial -m -s 2016-10-04 -e 2017-01-22T14:28:30 
like image 179
bricca Avatar answered Sep 21 '22 20:09

bricca