I'm just getting started with Airbnb's airflow, and I'm still not clear on how/when backfilling is done.
Specifically, there are 2 use-cases that confuse me:
If I run airflow scheduler
for a few minutes, stop it for a minute, then restart it again, my DAG seems to run extra tasks for the first 30 seconds or so, then it continues as normal (runs every 10 sec). Are these extra tasks "backfilled" tasks that weren't able to complete in an earlier run? If so, how would I tell airflow not to backfill those tasks?
If I run airflow scheduler
for a few minutes, then run airflow clear MY_tutorial
, then restart airflow scheduler
, it seems to run a TON of extra tasks. Are these tasks also somehow "backfilled" tasks? Or am I missing something.
Currently, I have a very simple dag:
default_args = { 'owner': 'me', 'depends_on_past': False, 'start_date': datetime(2016, 10, 4), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG( 'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10)) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 8)}}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag) second_template = """ touch ~/airflow/logs/test echo $(date) >> ~/airflow/logs/test """ t4 = BashOperator( task_id='write_test', bash_command=second_template, dag=dag) t1.set_upstream(t4) t2.set_upstream(t1) t3.set_upstream(t1)
The only two things I've changed in my airflow config are
CeleryExecutor
instead of a SequentialExecutor
Thanks so much for you help!
Backfilling can be accomplished in Airflow using the CLI. You simply specify the DAG ID, as well as the start date and end date for the backfill period. This command runs the DAG for all intervals between the start date and end date. DAGs in your backfill interval are still rerun even if they already have DAG runs.
The scheduler, by default, will kick off a DAG Run for any data interval that has not been run since the last data interval (or has been cleared). This concept is called Catchup.
According to the official Airflow docs, The task instances directly upstream from the task need to be in a success state. Also, if you have set depends_on_past=True, the previous task instance needs to have succeeded (except if it is the first run for that task).
DAGs. In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.
When you change the scheduler toggle to "on" for a DAG, the scheduler will trigger a backfill of all dag run instances for which it has no status recorded, starting with the start_date you specify in your "default_args".
For example: If the start date was "2017-01-21" and you turned on the scheduling toggle at "2017-01-22T00:00:00" and your dag was configured to run hourly, then the scheduler will backfill 24 dag runs and then start running on the scheduled interval.
This is essentially what is happening in both of your question. In #1, it is filling in the 3 missing runs from the 30 seconds which you turned off the scheduler. In #2, it is filling in all of the DAG runs from start_date until "now".
There are 2 ways around this:
e.g.
airflow backfill MY_tutorial -m -s 2016-10-04 -e 2017-01-22T14:28:30
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With