Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a conditional task in Airflow

I would like to create a conditional task in Airflow as described in the schema below. The expected scenario is the following:

  • Task 1 executes
  • If Task 1 succeed, then execute Task 2a
  • Else If Task 1 fails, then execute Task 2b
  • Finally execute Task 3

Conditional Task All tasks above are SSHExecuteOperator. I'm guessing I should be using the ShortCircuitOperator and / or XCom to manage the condition but I am not clear on how to implement that. Could you please describe the solution?

like image 816
Alexis.Rolland Avatar asked Apr 28 '17 10:04

Alexis.Rolland


People also ask

How do I set dependencies between tasks in Airflow?

Basic dependencies between Airflow tasks can be set in the following ways: Using bitshift operators ( << and >> ) Using the set_upstream and set_downstream methods.

How do you do branching in Airflow?

One of the simplest ways to implement branching in Airflow is to use the BranchPythonOperator. Like the PythonOperator , the BranchPythonOperator takes a Python function as an input. However, the BranchPythonOperator's input function must return a list of task IDs that the DAG should proceed with based on some logic.


1 Answers

Airflow has a BranchPythonOperator that can be used to express the branching dependency more directly.

The docs describe its use:

The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id. The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to be referencing a task directly downstream from the BranchPythonOperator task.

...

If you want to skip some tasks, keep in mind that you can’t have an empty path, if so make a dummy task.

Code Example

def dummy_test():     return 'branch_a'  A_task = DummyOperator(task_id='branch_a', dag=dag) B_task = DummyOperator(task_id='branch_false', dag=dag)  branch_task = BranchPythonOperator(     task_id='branching',     python_callable=dummy_test,     dag=dag, )  branch_task >> A_task  branch_task >> B_task 

EDIT:

If you're installing an Airflow version >=1.10.3, you can also return a list of task ids, allowing you to skip multiple downstream paths in a single Operator and don't use a dummy task before joining.

like image 85
villasv Avatar answered Sep 16 '22 11:09

villasv