Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Save result of operator in Apache Airflow

Several operators allow to pull data but I never managed to use the results.

For example: https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/bigquery_get_data.py

This operator can be called as follow:

get_data = BigQueryGetDataOperator(
      task_id='get_data_from_bq',
      dataset_id='test_dataset',
      table_id='Transaction_partitions',
      max_results='100',
      selected_fields='DATE',
      bigquery_conn_id='airflow-service-account'
      )

Yet, get_data is of type DAG but line 116 says "return table_data". To be clear, the operator works and retrieve the data, I just don't understand how to use the data retrieve/where it is located.

How do I get the data using "get_data" above?

like image 681
Breathe Avatar asked Dec 28 '18 14:12

Breathe


2 Answers

The way you would use get_data is in the next task can be a PythonOperator which you can then use to process the data.

get_data = BigQueryGetDataOperator(
      task_id='get_data_from_bq',
      dataset_id='test_dataset',
      table_id='Transaction_partitions',
      max_results='100',
      selected_fields='DATE',
      bigquery_conn_id='airflow-service-account'
      )

def process_data_from_bq(**kwargs):
      ti = kwargs['ti']
      bq_data = ti.xcom_pull(task_ids='get_data_from_bq')
      # Now bq_data here would have your data in Python list
      print(bq_data)

process_data = PythonOperator(
      task_id='process_data_from_bq',
      python_callable=process_bq_data,
      provide_context=True
      )

get_data >> process_data

PS: I am the author of BigQueryGetDataOperator and Airflow committer / PMC

like image 146
kaxil Avatar answered Oct 01 '22 20:10

kaxil


The return value is saved in an Xcom. You can access it from another operator as it is shown in this example.

data = ti.xcom_pull(task_ids='get_data_from_bq')
like image 41
SergiyKolesnikov Avatar answered Oct 01 '22 19:10

SergiyKolesnikov