I created a BranchPythonOperator which calls 2 tasks depending on the condition like:
typicon_check_table = BranchPythonOperator(
task_id='typicon_check_table',
python_callable=CheckTable(),
provide_context=True,
dag=typicon_task_dag)
typicon_create_table = PythonOperator(
task_id='typicon_create_table',
python_callable=CreateTable(),
provide_context=True,
dag=typicon_task_dag)
typicon_load_data = PythonOperator(
task_id='typicon_load_data',
python_callable=LoadData(),
provide_context=True,
dag=typicon_task_dag)
typicon_check_table.set_downstream([typicon_load_data, typicon_create_table])
typicon_create_table.set_downstream(typicon_load_data)
This is the CheckTable callable class:
class CheckTable:
"""
DAG task to check if table exists or not.
"""
def __call__(self, **kwargs) -> None:
pg_hook = PostgresHook(postgres_conn_id="postgres_docker")
query = "SELECT EXISTS ( \
SELECT 1 FROM information_schema.tables \
WHERE table_schema = 'public' \
AND table_name = 'users');"
table_exists = pg_hook.get_records(query)[0][0]
if table_exists:
return "typicon_load_data"
return "typicon_create_table"
The issue is both the tasks are getting skipped when the typicon_check_table task is run.
How to fix this issue?

I have worked out with same scenario , its working fine with me for below code
BranchPythonOperator(task_id='slot_population_on_is_y_or_n', python_callable=DAGConditionalValidation('Y'),
trigger_rule='one_success')
slot_population_on_is_y = DummyOperator(task_id='slot_population_on_is_y')
slot_population_on_is_n = DummyOperator(task_id='slot_population_on_is_n')
slot_population_on_is_y_or_n >> [slot_population_on_is_y, slot_population_on_is_n]
class DAGConditionalValidation:
def __init__(self, conditional_param_key):
self.conditional_param_key = conditional_param_key
def __call__(self, **kwargs):
if (conditional_param_key == 'Y'):
return slot_population_on_is_y
return slot_population_on_is_n
It looks all your code fine, but you're missing the trigger rule, please set trigger rule as trigger_rule='one_success'.
This should work for you as well.
The task typicon_load_data has typicon_create_table as a parent and the default trigger_rule is all_success, so I am not surprised by this behaviour.
Two possible cases here:
CheckTable() returns typicon_load_data, then typicon_create_table is skipped, but typicon_load_data being downstream is also skipped.CheckTable() returns typicon_create_table, that's executed and it triggers typicon_load_data which is skipped because it was the excluded branch.I assume your screenshot is from case 1.?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With