Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Airflow's BranchPythonOperator work?

I'm struggling to understand how BranchPythonOperator in Airflow works. I know it's primarily used for branching, but am confused by the documentation as to what to pass into a task and what I need to pass/expect from the task upstream.

Given the simple example in the documentation on this page what would the source code look like for the upstream task called run_this_first and the 2 downstream ones that are branched? How exactly does Airflow know to run branch_a instead of branch_b? Where does the upstream task's` output get noticed/read?

like image 561
simplycoding Avatar asked Jun 13 '17 15:06

simplycoding


People also ask

How do you do branching in Airflow?

One of this simplest ways to implement branching in Airflow is to use the BranchPythonOperator. Like the PythonOperator , the BranchPythonOperator takes a Python function as an input. However, the BranchPythonOperator's input function must return a list of task IDs that the DAG should proceed with based on some logic.

What is DummyOperator in Airflow?

class airflow.operators.dummy. DummyOperator(**kwargs)[source] Operator that does literally nothing. It can be used to group tasks in a DAG. The task is evaluated by the scheduler but never processed by the executor.

What is PythonVirtualenvOperator?

PythonVirtualenvOperator. Allows one to run a function in a virtualenv that is created and destroyed.


1 Answers

Your BranchPythonOperator is created with a python_callable, which will be a function. That function shall return, based on your business logic, the task name of the immediately downstream tasks that you have connected. This could be 1 to N tasks immediately downstream. There is nothing that the downstream tasks HAVE to read, however you could pass them metadata using xcom.

def decide_which_path():     if something is True:         return "branch_a"     else:         return "branch_b"   branch_task = BranchPythonOperator(     task_id='run_this_first',     python_callable=decide_which_path,     trigger_rule="all_done",     dag=dag)  branch_task.set_downstream(branch_a) branch_task.set_downstream(branch_b) 

It's important to set the trigger_rule or all of the rest will be skipped, as the default is all_success.

like image 105
Nick Avatar answered Sep 16 '22 11:09

Nick