Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow backfills and new dag runs

I have a DAG which has "DAG runs" since 1 Jan 2015 until today, scheduled every day. Tasks in the DAG are not "past dependent", meaning during a backfill they can be executed in any date order.

If I need to backfill a task in the DAG, I clear all the task instances (from today to past) using UI, then all DAG runs switch to "running" state and the task start backfilling from 1 Jan 2015 to today. Tasks are time consuming, so even when executed in parallel by multiple threads / workers, the backfill can finish only in a few days.

The problem is that new "DAG runs" for tomorrow, the day after tomorrow, etc. won't be added by scheduler until the backfill is finished so we fail to calculate new days' data on time. Is there any way to prioritize tasks for new days as they come and continue backfilling after tasks for the new day are finished?

P.S. Backfilling can also be done using "airflow backfill" CLI but this approach has its own problems so for now I'm interested in the backfilling technique described above.

like image 807
Aleksandr Borisov Avatar asked Jul 31 '18 15:07

Aleksandr Borisov


1 Answers

Similar to the comment on your question, the way I solved this as a work around when I was backfilling a large database was to have a dag generator create three dags (two backfill and one ongoing) based on connection_created_on and start_date values.

The ongoing dag runs hourly and begins at midnight the same day as the connection_created_on value. The two backfills then pull daily starting on the first of the current month and then monthly starting with the first month of the start_date. In this case, I knew that we would always want to start on the first of the month and that data up to a month in scope was small enough to be pulled together so I split it up into these three dag types for expediency.

def create_dag(dag_id,
           schedule,
           db_conn_id,
           default_args,
           catchup=False,
           max_active_runs=3):

    dag = DAG(dag_id,
              default_args=default_args,
              schedule_interval=schedule,
              catchup=catchup,
              max_active_runs=max_active_runs
              )
    with dag:
        kick_off_dag = DummyOperator(task_id='kick_off_dag')

    return dag

db_conn_id = 'my_first_db_conn'
connection_created_on = '2018-05-17 12:30:54.271Z'

hourly_id = '{}_to_redshift_hourly'.format(db_conn_id)
daily_id = '{}_to_redshift_daily_backfill'.format(db_conn_id)
monthly_id = '{}_to_redshift_monthly_backfill'.format(db_conn_id)

start_date = '2005-01-01 00:00:00.000Z'
start_date = datetime.strptime(start_date, '%Y-%m-%dT%H:%M:%S.%fZ')
start_date = datetime(start_date.year, start_date.month, 1)

cco_datetime = datetime.strptime(connection_created_on, '%Y-%m-%dT%H:%M:%S.%fZ')
hourly_start_date = datetime(cco_datetime.year, cco_datetime.month, cco_datetime.day)
daily_start_date = hourly_start_date - timedelta(days=(cco_datetime.day-1))
daily_end_date = hourly_start_date - timedelta(days=1)
monthly_start_date = start_date if start_date else hourly_start_date - timedelta(days=365+cco_datetime.day-1)
monthly_end_date = daily_start_date

globals()[hourly_id] = create_dag(hourly_id,
                                  '@hourly',
                                  db_conn_id,
                                  {'start_date': hourly_start_date,
                                   'retries': 2,
                                   'retry_delay': timedelta(minutes=5),
                                   'email': [],
                                   'email_on_failure': True,
                                   'email_on_retry': False},
                                  catchup=True,
                                  max_active_runs=1)

globals()[daily_id] = create_dag(daily_id,
                                 '@daily',
                                 db_conn_id,
                                 {'start_date': daily_start_date,
                                  'end_date': daily_end_date,
                                  'retries': 2,
                                  'retry_delay': timedelta(minutes=5),
                                  'email': [],
                                  'email_on_failure': True,
                                  'email_on_retry': False},
                                 catchup=True)

globals()[monthly_id] = create_dag(monthly_id,
                                   '@monthly',
                                   db_conn_id,
                                   {'start_date': monthly_start_date,
                                    'end_date': monthly_end_date,
                                    'retries': 2,
                                    'retry_delay': timedelta(minutes=5),
                                    'email': [],
                                    'email_on_failure': True,
                                    'email_on_retry': False},
                                   catchup=True)
like image 161
Ben Gregory Avatar answered Sep 27 '22 21:09

Ben Gregory