Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow:BranchSQLOperator:TypeError: 'XComArg' object is not iterable

Requirement: Branch the next task based on the return value from SQL

I tried achieving using branchsqloperator

Error: But got an error as TypeError: 'XComArg' object is not iterable

Below is the SQL query

SELECT
    CASE
        WHEN COUNT(*) > 2 THEN 0 --false
        ELSE 1 --true
    END
FROM
    TABLE_A;

dag :

 @task(task_id="task_a")
        def task_a():
            return "job_success"

        @task(task_id="task_b")
        def task_b():
            return "job_failure"

        task_a = task_a()
        task_b = task_b()
        branch_sql = BranchSQLOperator(
                    task_id='branch_sql', 
                    sql='query.sql',
                    follow_task_ids_if_true = task_a,
                    follow_task_ids_if_false = task_b,
                    conn_id = 'default_conn_id',
                )

        branch_sql >> [task_a,task_b]

[![enter image description here][1]][1]

Below is the Log:

[2022-07-20, 21:11:27 UTC] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
[2022-07-20, 21:11:27 UTC] {base.py:68} INFO - Using connection ID 'snflk_conn_id' for task execution.
[2022-07-20, 21:11:27 UTC] {base.py:68} INFO - Using connection ID 'snflk_conn_id' for task execution.
[2022-07-20, 21:11:27 UTC] {connection.py:272} INFO - Snowflake Connector for Python Version: 2.7.1, Python Version: 3.7.13, Platform: Linux-5.10.104-linuxkit-x86_64-with-debian-11.3
[2022-07-20, 21:11:27 UTC] {connection.py:879} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
[2022-07-20, 21:11:27 UTC] {connection.py:896} INFO - Setting use_openssl_only mode to False
[2022-07-20, 21:11:28 UTC] {cursor.py:696} INFO - query: [SELECT CASE WHEN COUNT(*) > 2 THEN 0 --false ELSE 1 --true END FROM TABLE...]
[2022-07-20, 21:11:28 UTC] {cursor.py:720} INFO - query execution done
[2022-07-20, 21:11:28 UTC] {connection.py:509} INFO - closed
[2022-07-20, 21:11:28 UTC] {connection.py:512} INFO - No async queries seem to be running, deleting session
[2022-07-20, 21:11:28 UTC] {sql.py:531} INFO - Query returns 1, type '<class 'int'>'
[2022-07-20, 21:11:28 UTC] {skipmixin.py:140} INFO - Following branch {{ task_instance.xcom_pull(task_ids='task_a', dag_id='abc', key='return_value') }}
[2022-07-20, 21:11:28 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/sql.py", line 557, in execute
    self.skip_all_except(context["ti"], follow_branch)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/skipmixin.py", line 146, in skip_all_except
    branch_task_ids = set(branch_task_ids)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom_arg.py", line 92, in __iter__
    raise TypeError(f"{self.__class__.__name__!r} object is not iterable")
TypeError: 'XComArg' object is not iterable
[2022-07-20, 21:11:28 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=abc, task_id=branch_sql, execution_date=20220720T211124, start_date=20220720T211126, end_date=20220720T211128
[2022-07-20, 21:11:28 UTC] {standard_task_runner.py:97} ERROR - Failed to execute job 1034 for task branch_sql ('XComArg' object is not iterable; 5827)
[2022-07-20, 21:11:28 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
[2022-07-20, 21:11:28 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
like image 834
Kar Avatar asked Nov 28 '25 17:11

Kar


1 Answers

The BranchSQLOperator is expected to return task_id but in your case you are returning a class object. This is also what the error states {self.__class__.__name__!r} object is not iterable In your case task_a and task_b are class objects. "task_a" and "task_b" are the task_id(s)

You should replace

follow_task_ids_if_true = task_a,
follow_task_ids_if_false = task_b,

With

follow_task_ids_if_true = "task_a",
follow_task_ids_if_false = "task_b",
like image 130
Elad Kalif Avatar answered Dec 02 '25 00:12

Elad Kalif



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!