Is it possible to make an Airflow DAG fail if any task fails?
I usually have some cleaning up tasks at the end of a DAG and as it is now, whenever the last task succeeds the whole DAG is marked as a success.
The easiest way to rerun a task in Airflow is to clear the task status. Doing so updates two values in the metastore, causing the task to rerun: max_tries updates to 0 , and the current task instance state updates to None .
Basically, a trigger rule defines why a task gets triggered, on which condition. By default, all tasks have the same trigger rule all_success set which means, if all parents of a task succeed, then the task gets triggered.
Another solution can be to add a final PythonOperator that checks the status of all tasks in this run:
final_status = PythonOperator(
task_id='final_status',
provide_context=True,
python_callable=final_status,
trigger_rule=TriggerRule.ALL_DONE, # Ensures this task runs even if upstream fails
dag=dag,
)
def final_status(**kwargs):
for task_instance in kwargs['dag_run'].get_task_instances():
if task_instance.current_state() != State.SUCCESS and \
task_instance.task_id != kwargs['task_instance'].task_id:
raise Exception("Task {} failed. Failing this DAG run".format(task_instance.task_id))
Facing a similar problem. It is not a bug but it could be a nice feature to add this property to Dag.
As a workaround, I you can push a XCOM variable during the task that is allowed to fail and in the downstream tasks do something like
if ti.xcom_pull(key='state', task_ids=task_allowed_to_fail_id) == 'FAILED':
raise ValueError('Force failure because upstream task has failed')
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With