I would like to create a conditional task in Airflow as described in the schema below. The expected scenario is the following:
All tasks above are SSHExecuteOperator. I'm guessing I should be using the ShortCircuitOperator and / or XCom to manage the condition but I am not clear on how to implement that. Could you please describe the solution?
Basic dependencies between Airflow tasks can be set in the following ways: Using bitshift operators ( << and >> ) Using the set_upstream and set_downstream methods.
One of the simplest ways to implement branching in Airflow is to use the BranchPythonOperator. Like the PythonOperator , the BranchPythonOperator takes a Python function as an input. However, the BranchPythonOperator's input function must return a list of task IDs that the DAG should proceed with based on some logic.
Airflow has a BranchPythonOperator that can be used to express the branching dependency more directly.
The docs describe its use:
The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id. The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to be referencing a task directly downstream from the BranchPythonOperator task.
...
If you want to skip some tasks, keep in mind that you can’t have an empty path, if so make a dummy task.
def dummy_test(): return 'branch_a' A_task = DummyOperator(task_id='branch_a', dag=dag) B_task = DummyOperator(task_id='branch_false', dag=dag) branch_task = BranchPythonOperator( task_id='branching', python_callable=dummy_test, dag=dag, ) branch_task >> A_task branch_task >> B_task
If you're installing an Airflow version >=1.10.3, you can also return a list of task ids, allowing you to skip multiple downstream paths in a single Operator and don't use a dummy task before joining.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With