Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow: re execute the jobs of a DAG for the past n days on a daily basis

I have scheduled the execution of a DAG to run daily. It works perfectly for one day.

However each day I would like to re-execute not only for the current day {{ ds }} but also for the previous n days (let's say n = 7).

For example, in the next execution scheduled to run on "2018-01-30" I would like Airflow not only to run the DAG using as execution date "2018-01-30", but also to re-run the DAGs for all the previous days from "2018-01-23" to "2018-01-30".

Is there an easy way to "invalidate" the previous execution so that a backfill is run automatically?

like image 643
lucacerone Avatar asked Jan 29 '18 16:01

lucacerone


People also ask

How do you rerun DAG in Airflow?

Clear all tasks​ To rerun multiple DAGs, click Browse > DAG Runs, select the DAGs to rerun, and in the Actions list select Clear the state.

How do you rerun Airflow?

If you want to re-run a task in Airflow, the best way to do so is to press Clear or Delete (language depends on the Airflow version you're running), not Run . Hitting this will clear the state of your failed task and allow the scheduler to pick it back up and re-run it.

What is depends on past in Airflow?

According to the official Airflow docs, The task instances directly upstream from the task need to be in a success state. Also, if you have set depends_on_past=True, the previous task instance needs to have succeeded (except if it is the first run for that task).

How do I change the DAG schedule in Airflow?

To schedule a dag, Airflow just looks for the last execution date and sum the schedule interval . If this time has expired it will run the dag. You cannot simple update the start date. A simple way to do this is edit your start date and schedule interval , rename your dag (e.g. xxxx_v2.py) and redeploy it.


1 Answers

You can generate dynamically tasks in a loop and pass the offset to your operator.

Here is an example with the Python one.

import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG

from datetime import timedelta


args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'schedule_interval': '0 10 * * *'
}

def check_trigger(execution_date, day_offset, **kwargs):
    target_date = execution_date - timedelta(days=day_offset)
    # use target_date

for day_offset in xrange(1, 8):
    PythonOperator(
        task_id='task_offset_' + i,
        python_callable=check_trigger,
        provide_context=True,
        dag=dag,
        op_kwargs={'day_offset' : day_offset}
    )
like image 84
Antoine Augusti Avatar answered Sep 23 '22 15:09

Antoine Augusti