Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can i pull xcom value from Airflow sensor?

Tags:

python

airflow

Main Problem: I am trying to create a BigQuery Table, if not exists.

Approach: Using BigQueryTableSensor to check if table exists, and based on the return value, creates or not a new table using BigQueryCreateEmptyTableOperator.

Problem: I'm not able to get the return value of BigQueryTableSensor sensor using xcom. As we know, the poke method need to return a boolean value.

This is how i created my task:

check_if_table_exists = BigQueryTableSensor(
        task_id='check_if_table_exists',
        project_id='my_project',
        dataset_id='my_dataset',
        table_id='my_table',
        bigquery_conn_id='bigquery_default',
        timeout=120,
        do_xcom_push=True,
    )

# Output: INFO - Success criteria met. Exiting.

get_results = BashOperator(
        task_id='get_results',
        bash_command="echo {{ ti.xcom_pull(task_ids='check_if_table_exists') }}"
    )

# Output: INFO - Running command: echo None

Looking at Airflow interface, i checked that BigQueryTableSensor didn't pushed nothing :(

enter image description here

Question:

  • Is there a way that i can get the return value of my sensor?

  • Is there a better approach to solve my main problem? Maybe using BigQueryOperator and a sql query like "CREATE TABLE IF NOT EXISTS".

like image 917
Jonas Ferreira Avatar asked Dec 22 '22 15:12

Jonas Ferreira


2 Answers

Yes it's possible, I made it work like this:

class MyCustomSensor(BaseSensorOperator):

    @apply_defaults
    def __init__(self,
                 *args,
                 **kwargs):
        super(MyCustomSensor, self).__init__(*args, **kwargs)

    def poke(self, context):
        application_id = context['ti'].xcom_pull(key='application_id')
        print("We found " + application_id)
        return True

Here is a full DAG example:

import os
import sys
from datetime import datetime
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


dag = DAG('my_dag_name',
          description='DAG ',
          schedule_interval=None,
          start_date=datetime(2021, 1, 7),
          tags=["samples"],
          catchup=False)

class MyCustomSensor(BaseSensorOperator):

    @apply_defaults
    def __init__(self,
                 *args,
                 **kwargs):
        super(MyCustomSensor, self).__init__(*args, **kwargs)

    def poke(self, context):
        application_id = context['ti'].xcom_pull(key='application_id')
        print("We found " + application_id)
        return True


def launch_spark_job(**kwargs):
    application_id = "application_1613995447156_11473"
    kwargs['ti'].xcom_push(key='application_id', value=application_id)


launch_spark_job_op = PythonOperator(task_id='test_python',
                                     python_callable=launch_spark_job,
                                     provide_context=True,
                                     dag=dag)

wait_spark_job_sens = MyCustomSensor(task_id='wait_spark_job',
                                     dag=dag,
                                     mode="reschedule")

launch_spark_job_op >> wait_spark_job_sens
like image 171
tdebroc Avatar answered Jan 10 '23 20:01

tdebroc


This is not the use case for Sensors. Sensors makes the workflow to wait for something to happen. In your case BigQueryTableSensor will wait until the table created by some other process and only then will continue to downstream task.

What you are looking for is either:

  1. Using BigQueryCheckOperator to run a query that return boolean value (True if table exist, False otherwise) then you will be able to pull the boolean value from XCOM in your BashOperator.
  2. Branch operator (like: BranchSQLOperator) where the workflow branch based on the result of SQL query that checks if the table exist. In that option there is no need to use XCOM.
like image 32
Elad Kalif Avatar answered Jan 10 '23 22:01

Elad Kalif