Main Problem: I am trying to create a BigQuery Table, if not exists.
Approach: Using BigQueryTableSensor to check if table exists, and based on the return value, creates or not a new table using BigQueryCreateEmptyTableOperator.
Problem: I'm not able to get the return value of BigQueryTableSensor sensor using xcom. As we know, the poke method need to return a boolean value.
This is how i created my task:
check_if_table_exists = BigQueryTableSensor(
task_id='check_if_table_exists',
project_id='my_project',
dataset_id='my_dataset',
table_id='my_table',
bigquery_conn_id='bigquery_default',
timeout=120,
do_xcom_push=True,
)
# Output: INFO - Success criteria met. Exiting.
get_results = BashOperator(
task_id='get_results',
bash_command="echo {{ ti.xcom_pull(task_ids='check_if_table_exists') }}"
)
# Output: INFO - Running command: echo None
Looking at Airflow interface, i checked that BigQueryTableSensor didn't pushed nothing :(
Question:
Is there a way that i can get the return value of my sensor?
Is there a better approach to solve my main problem? Maybe using BigQueryOperator and a sql query like "CREATE TABLE IF NOT EXISTS".
Yes it's possible, I made it work like this:
class MyCustomSensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MyCustomSensor, self).__init__(*args, **kwargs)
def poke(self, context):
application_id = context['ti'].xcom_pull(key='application_id')
print("We found " + application_id)
return True
Here is a full DAG example:
import os
import sys
from datetime import datetime
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
dag = DAG('my_dag_name',
description='DAG ',
schedule_interval=None,
start_date=datetime(2021, 1, 7),
tags=["samples"],
catchup=False)
class MyCustomSensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MyCustomSensor, self).__init__(*args, **kwargs)
def poke(self, context):
application_id = context['ti'].xcom_pull(key='application_id')
print("We found " + application_id)
return True
def launch_spark_job(**kwargs):
application_id = "application_1613995447156_11473"
kwargs['ti'].xcom_push(key='application_id', value=application_id)
launch_spark_job_op = PythonOperator(task_id='test_python',
python_callable=launch_spark_job,
provide_context=True,
dag=dag)
wait_spark_job_sens = MyCustomSensor(task_id='wait_spark_job',
dag=dag,
mode="reschedule")
launch_spark_job_op >> wait_spark_job_sens
This is not the use case for Sensors.
Sensors makes the workflow to wait for something to happen. In your case BigQueryTableSensor
will wait until the table created by some other process and only then will continue to downstream task.
What you are looking for is either:
BigQueryCheckOperator
to run a query that return boolean value (True if table exist, False otherwise) then you will be able to pull the boolean value from XCOM in your BashOperator
.BranchSQLOperator
) where the workflow
branch based on the result of SQL query that checks if the table
exist. In that option there is no need to use XCOM.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With