Requirement: Branch the next task based on the return value from SQL
I tried achieving using branchsqloperator
Error: But got an error as TypeError: 'XComArg' object is not iterable
Below is the SQL query
SELECT
CASE
WHEN COUNT(*) > 2 THEN 0 --false
ELSE 1 --true
END
FROM
TABLE_A;
dag :
@task(task_id="task_a")
def task_a():
return "job_success"
@task(task_id="task_b")
def task_b():
return "job_failure"
task_a = task_a()
task_b = task_b()
branch_sql = BranchSQLOperator(
task_id='branch_sql',
sql='query.sql',
follow_task_ids_if_true = task_a,
follow_task_ids_if_false = task_b,
conn_id = 'default_conn_id',
)
branch_sql >> [task_a,task_b]
[![enter image description here][1]][1]
Below is the Log:
[2022-07-20, 21:11:27 UTC] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
[2022-07-20, 21:11:27 UTC] {base.py:68} INFO - Using connection ID 'snflk_conn_id' for task execution.
[2022-07-20, 21:11:27 UTC] {base.py:68} INFO - Using connection ID 'snflk_conn_id' for task execution.
[2022-07-20, 21:11:27 UTC] {connection.py:272} INFO - Snowflake Connector for Python Version: 2.7.1, Python Version: 3.7.13, Platform: Linux-5.10.104-linuxkit-x86_64-with-debian-11.3
[2022-07-20, 21:11:27 UTC] {connection.py:879} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
[2022-07-20, 21:11:27 UTC] {connection.py:896} INFO - Setting use_openssl_only mode to False
[2022-07-20, 21:11:28 UTC] {cursor.py:696} INFO - query: [SELECT CASE WHEN COUNT(*) > 2 THEN 0 --false ELSE 1 --true END FROM TABLE...]
[2022-07-20, 21:11:28 UTC] {cursor.py:720} INFO - query execution done
[2022-07-20, 21:11:28 UTC] {connection.py:509} INFO - closed
[2022-07-20, 21:11:28 UTC] {connection.py:512} INFO - No async queries seem to be running, deleting session
[2022-07-20, 21:11:28 UTC] {sql.py:531} INFO - Query returns 1, type '<class 'int'>'
[2022-07-20, 21:11:28 UTC] {skipmixin.py:140} INFO - Following branch {{ task_instance.xcom_pull(task_ids='task_a', dag_id='abc', key='return_value') }}
[2022-07-20, 21:11:28 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/sql.py", line 557, in execute
self.skip_all_except(context["ti"], follow_branch)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/skipmixin.py", line 146, in skip_all_except
branch_task_ids = set(branch_task_ids)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom_arg.py", line 92, in __iter__
raise TypeError(f"{self.__class__.__name__!r} object is not iterable")
TypeError: 'XComArg' object is not iterable
[2022-07-20, 21:11:28 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=abc, task_id=branch_sql, execution_date=20220720T211124, start_date=20220720T211126, end_date=20220720T211128
[2022-07-20, 21:11:28 UTC] {standard_task_runner.py:97} ERROR - Failed to execute job 1034 for task branch_sql ('XComArg' object is not iterable; 5827)
[2022-07-20, 21:11:28 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
[2022-07-20, 21:11:28 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
The BranchSQLOperator is expected to return task_id but in your case you are returning a class object. This is also what the error states {self.__class__.__name__!r} object is not iterable
In your case task_a and task_b are class objects. "task_a" and "task_b" are the task_id(s)
You should replace
follow_task_ids_if_true = task_a,
follow_task_ids_if_false = task_b,
With
follow_task_ids_if_true = "task_a",
follow_task_ids_if_false = "task_b",
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With