In my DAG wanted to skip a Task (oracle_merge_hist_orig) depend on a flag.
My logic is:
when oracle_branch=True execute [merge_op,update_table_op,table_count_op]
when oracle_branch=False execute [update_table_op, table_count_op]
I tried to use BranchPythonOperator as follows:
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
oracle_branch = True
def branch_func():
if oracle_branch:
return "oracle_branch"
else:
return "normal_branch"
dag = DAG(
dag_id='example_branch_operator',
default_args=args,
schedule_interval="@daily",
)
branching_op = BranchPythonOperator(
task_id='branch_shall_run_oracle_merge_original_hist',
python_callable=branch_func,
dag= dag)
oracle_branch = DummyOperator(
task_id='oracle_branch',
dag=dag)
normal_branch = DummyOperator(
task_id='normal_branch',
dag=dag)
merge_op = DummyOperator(
task_id='oracle_merge_hist_orig',
dag=dag,
)
update_table_op = DummyOperator(
task_id='update_table_job',
dag=dag,
)
table_count_op = DummyOperator(
task_id='table_count',
dag=dag,
)
branching_op >> [oracle_branch,normal_branch]
normal_branch >> update_table_op >> table_count_op
oracle_branch >> merge_op >> update_table_op >> table_count_op
However ,instead of skipping the Task it skips the entire path.
How to fix this so that I only skip the "racle_merge_hist_orig" task ?
When oracle_branch=False

when oracle_branch=True

Every task will have a trigger_rule which is set to all_success by default. We can override it to different values that are listed here.
In your DAG, the update_table_job task has two upstream tasks. Since one of its upstream task is in skipped state, it also went into skipped state. We can avoid this by overriding the default value of trigger_rule to one_success like below.
update_table_op = DummyOperator(
task_id='update_table_job',
trigger_rule='one_success',
dag=dag
)
Note: I tested this on Airflow 1.10.4 version.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With