Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow - run task regardless of upstream success/fail

Tags:

python

airflow

I have a DAG which fans out to multiple independent units in parallel. This runs in AWS, so we have tasks which scale our AutoScalingGroup up to the maximum number of workers when the DAG starts, and to the minimum when the DAG completes. The simplified version looks like this:

           | - - taskA - - |            |               | scaleOut - | - - taskB - - | - scaleIn            |               |            | - - taskC - - | 

However, some of the tasks in the parallel set fail occasionally, and I can't get the scaleDown task to run when any of the A-C tasks fail.

What's the best way to have a task execute at the end of the DAG, once all other tasks have completed (success or fail)? The depends_on_upstream setting sounded like what we needed, but didn't actually do anything based on testing.

like image 922
J. Doe Avatar asked Jun 05 '17 02:06

J. Doe


People also ask

What is upstream failed in Airflow?

An upstream failure stops downstream tasks from being executed with the default trigger rule 'all_success', which requires all upstream tasks to be successful. Note that Airflow does continue executing tasks which do not have any dependency on the failed task (fetch_weather and process_weather).

How do I run a failed task in Airflow?

The easiest way to rerun a task in Airflow is to clear the task status. Doing so updates two values in the metastore, causing the task to rerun: max_tries updates to 0 , and the current task instance state updates to None .

What is trigger rule in Airflow?

Basically, a trigger rule defines why a task gets triggered, on which condition. By default, all tasks have the same trigger rule all_success set which means, if all parents of a task succeed, then the task gets triggered.

What is base operator in Airflow?

Abstract base class for all operators. Since operators create objects that become nodes in the dag, BaseOperator contains many recursive methods for dag crawling behavior. To derive this class, you are expected to override the constructor as well as the 'execute' method.


1 Answers

All operators have an argument trigger_rule which can be set to 'all_done', which will trigger that task regardless of the failure or success of the previous task(s).

You could set the trigger rule for the task you want to run to 'all_done' instead of the default 'all_success'.

A simple bash operator task with that argument would look like:

task = BashOperator(     task_id="hello_world",     bash_command="echo Hello World!",     trigger_rule="all_done",     dag=dag     ) 
like image 79
Nick Avatar answered Sep 28 '22 06:09

Nick