i have a task in an airflow DAG. it has three child tasks. unfortunately, there are cases where this parent task will succeed, but two of the three children will fail (and a retry on the children won't fix them).
it requires the parent to retry (even though it didn't fail).
so i dutifully go into the graph view of the dag run and 'clear' this parent task and all downstream tasks (+recursive).
is there a way i can do this within the dag itself?
If your tasks are part of a subdag, calling dag.clear() in the on_retry_callback of a SubDagOperator should do the trick:
SubDagOperator(
subdag=subdag,
task_id="...",
on_retry_callback=lambda context: subdag.clear(
start_date=context['execution_date'],
end_date=context['execution_date']),
dag=dag
)
We opted for using the clear_task_instances method of the taskinstance:
@provide_session
def clear_tasks_fn(tis,session=None,activate_dag_runs=False,dag=None) -> None:
"""
Wrapper for `clear_task_instances` to be used in callback function
(that accepts only `context`)
"""
taskinstance.clear_task_instances(tis=tis,
session=session,
activate_dag_runs=activate_dag_runs,
dag=dag)
def clear_tasks_callback(context) -> None:
"""
Clears tasks based on list passed as `task_ids_to_clear` parameter
To be used as `on_retry_callback`
"""
all_tasks = context["dag_run"].get_task_instances()
dag = context["dag"]
task_ids_to_clear = context["params"].get("task_ids_to_clear", [])
tasks_to_clear = [ ti for ti in all_tasks if ti.task_id in task_ids_to_clear ]
clear_tasks_fn(tasks_to_clear,
dag=dag)
You would need to provide the list of tasks you want cleared on the callback, e.g on any child task:
DummyOperator('some_child',
on_retry_callback=clear_tasks_callback,
params=dict(
task_ids_to_clear=['some_child', 'parent']
),
retries=1
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With