Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow: How to push xcom value from BigQueryOperator?

Tags:

airflow

This is my operator:

bigquery_check_op = BigQueryOperator(
    task_id='bigquery_check',
    bql=SQL_QUERY,
    use_legacy_sql = False,
    bigquery_conn_id=CONNECTION_ID,
    trigger_rule='all_success',
    xcom_push=True,
    dag=dag
)

When I check the Render page in the UI. Nothing appears there. When I run the SQL in the console it return value 1400 which is correct. Why the operator doesn't push the XCOM?

I can't use BigQueryValueCheckOperator. This operator is designed to FAIL against a check of value. I don't want nothing to fail. I simply want to branch the code based on the return value from the query.

like image 212
Programmer120 Avatar asked Oct 16 '22 12:10

Programmer120


1 Answers

Here is how you might be able to accomplish this with the BigQueryHook and the BranchPythonOperator:

from airflow.operators.python_operator import BranchPythonOperator
from airflow.contrib.hooks import BigQueryHook

def big_query_check(**context):
    sql = context['templates_dict']['sql']
    bq = BigQueryHook(bigquery_conn_id='default_gcp_connection_id',
                        use_legacy_sql=False)
    conn = bq.get_conn()
    cursor = conn.cursor()
    results = cursor.execute(sql)

    # Do something with results, return task_id to branch to
    if results == 0:
        return "task_a"
    else:
        return "task_b"

  
sql = "SELECT COUNT(*) FROM sales"


branching = BranchPythonOperator(
    task_id='branching',
    python_callable=big_query_check,
    provide_context= True,
    templates_dict = {"sql": sql}
    dag=dag,
)

First we create a python callable that we can use to execute the query and select which task_id to branch too. Second, we create the BranchPythonOperator.

like image 161
Mike Avatar answered Oct 21 '22 03:10

Mike