Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can you re-run upstream task if a downstream task fails in Airflow (using Sub Dags)

I have an airflow dag that extracts data and performs validation. If the validation fails, it needs to re-run the extract. If the validation succeeds its continues.

I've read people saying that sub dags can solve this problem, but I can't see any example of this. I've tried using a sub dag, but come across the same problem as trying to do it in one DAG.

How can I get all tasks in the Sub DAG to re-run if one of them fails?

I have the following DAG/sub dag details:

maindag.py

default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': start_date,
  'retries': 3,
  'retry_delay': timedelta(minutes=5),
  'sla': timedelta(hours=sla_hours)
}

main_dag = DAG(
  dag_id,
  default_args=default_args,
  schedule_interval='30 14 * * *',
  max_active_runs=1,
  concurrency=1)

task1 = BashOperator(...)

task2 = SubDagOperator(
  task_id=sub_dag_task_id,
  subdag=sub_dag(dag_id, sub_dag_task_id, start_date, main_dag.schedule_interval),
  dag=main_dag)

task3 = BashOperator(...)

subdag.py

def sub_dag(parent_dag_name, task_id, start_date, schedule_interval):
  dag = DAG(
    '%s.%s' % (parent_dag_name, task_id),
    schedule_interval=schedule_interval,
    start_date=start_date,
    )

  task1 = BashOperator(...)

  task2 = BashOperator(...)

  task3 = BashOperator(...)

  task1 >> task2 >> task3

  return dag

In the sub dag, if task 3 fails, I want task 1 to run again even though it has succeeded. Why is this so hard to do??!

like image 488
Alistair McIntyre Avatar asked Feb 27 '18 12:02

Alistair McIntyre


1 Answers

I've found a solution to this by creating a retry on callback method in main dag:

(original source: https://gist.github.com/nathairtras/6ce0b0294be8c27d672e2ad52e8f2117 )

from airflow.models import DagBag

def callback_subdag_clear(context):
    """Clears a subdag's tasks on retry."""
    dag_id = "{}.{}".format(
        context['dag'].dag_id,
        context['ti'].task_id
)
    execution_date = context['execution_date']
    sdag = DagBag().get_dag(dag_id)
    sdag.clear(
        start_date=execution_date,
        end_date=execution_date,
        only_failed=False,
        only_running=False,
        confirm_prompt=False,
        include_subdags=False)

Then for my task that runs subdagoperator, it has:

on_retry_callback=callback_subdag_clear,

It now clears out the task instance history of each task and re-runs each task in the sub dag up to the number of retries in the main dag.

like image 64
Alistair McIntyre Avatar answered Nov 05 '22 23:11

Alistair McIntyre