I have an airflow dag that extracts data and performs validation. If the validation fails, it needs to re-run the extract. If the validation succeeds its continues.
I've read people saying that sub dags can solve this problem, but I can't see any example of this. I've tried using a sub dag, but come across the same problem as trying to do it in one DAG.
How can I get all tasks in the Sub DAG to re-run if one of them fails?
I have the following DAG/sub dag details:
maindag.py
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': start_date,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'sla': timedelta(hours=sla_hours)
}
main_dag = DAG(
dag_id,
default_args=default_args,
schedule_interval='30 14 * * *',
max_active_runs=1,
concurrency=1)
task1 = BashOperator(...)
task2 = SubDagOperator(
task_id=sub_dag_task_id,
subdag=sub_dag(dag_id, sub_dag_task_id, start_date, main_dag.schedule_interval),
dag=main_dag)
task3 = BashOperator(...)
subdag.py
def sub_dag(parent_dag_name, task_id, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, task_id),
schedule_interval=schedule_interval,
start_date=start_date,
)
task1 = BashOperator(...)
task2 = BashOperator(...)
task3 = BashOperator(...)
task1 >> task2 >> task3
return dag
In the sub dag, if task 3 fails, I want task 1 to run again even though it has succeeded. Why is this so hard to do??!
I've found a solution to this by creating a retry on callback method in main dag:
(original source: https://gist.github.com/nathairtras/6ce0b0294be8c27d672e2ad52e8f2117 )
from airflow.models import DagBag
def callback_subdag_clear(context):
"""Clears a subdag's tasks on retry."""
dag_id = "{}.{}".format(
context['dag'].dag_id,
context['ti'].task_id
)
execution_date = context['execution_date']
sdag = DagBag().get_dag(dag_id)
sdag.clear(
start_date=execution_date,
end_date=execution_date,
only_failed=False,
only_running=False,
confirm_prompt=False,
include_subdags=False)
Then for my task that runs subdagoperator, it has:
on_retry_callback=callback_subdag_clear,
It now clears out the task instance history of each task and re-runs each task in the sub dag up to the number of retries in the main dag.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With