Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow BigQueryOperator: how to save query result in a partitioned Table?

I have a simple DAG

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

with DAG(dag_id='my_dags.my_dag') as dag:

    start = DummyOperator(task_id='start')

    end = DummyOperator(task_id='end')
    sql = """
             SELECT *
             FROM 'another_dataset.another_table'
          """
    bq_query = BigQueryOperator(bql=sql,
                            destination_dataset_table='my_dataset.my_table20180524'),
                            task_id='bq_query',
                            bigquery_conn_id='my_bq_connection',
                            use_legacy_sql=False,
                            write_disposition='WRITE_TRUNCATE',
                            create_disposition='CREATE_IF_NEEDED',
                            query_params={})
    start >> bq_query >> end

When executing the bq_query task the SQL query gets saved in a sharded table. I want it to get saved in a daily partitioned table. In order to do so, I only changed destination_dataset_table to my_dataset.my_table$20180524. I got the error below when executing the bq_task:

Partitioning specification must be provided in order to create partitioned table

How can I specify to BigQuery to save query result to a daily partitioned table ? my first guess has been to use query_params in BigQueryOperator but I didn't find any example on how to use that parameter.

EDIT:

I'm using google-cloud==0.27.0 python client ... and it's the one used in Prod :(

like image 751
MassyB Avatar asked May 24 '18 08:05

MassyB


People also ask

How to validate data in BigQuery using airflow BigQuery operators?

With Airflow BigQuery Operators, you can also validate your data and check whether or not the SQL query executed has returned the valid data. In this case, you can use the “BigQueryCheckOperator” operator. Given below is the syntax of this operator:

How to create ingestion-time partitioned tables using bigqueryoperator?

Using BigQueryOperator you can pass time_partitioning parameter which will create ingestion-time partitioned tables

When to use query_params in BigQuery?

You could use it if you need to provide some params that are not supported by BigQueryOperator like args. schema_update_options ( tuple) – Allows the schema of the destination table to be updated as a side effect of the load job. query_params ( list) – a list of dictionary containing query parameter types and values, passed to BigQuery.

What is the use of helper class in BigQuery?

Helper class for constructing BigQuery link. Executes BigQuery SQL queries in a specific BigQuery database bql ( Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql'.) – (Deprecated.


1 Answers

You first need to create an Empty partitioned destination table. Follow instructions here: link to create an empty partitioned table

and then run below airflow pipeline again. You can try code:

import datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
today_date = datetime.datetime.now().strftime("%Y%m%d")
table_name = 'my_dataset.my_table' + '$' + today_date
with DAG(dag_id='my_dags.my_dag') as dag:
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')
    sql = """
         SELECT *
         FROM 'another_dataset.another_table'
          """
    bq_query = BigQueryOperator(bql=sql,
                        destination_dataset_table={{ params.t_name }}),
                        task_id='bq_query',
                        bigquery_conn_id='my_bq_connection',
                        use_legacy_sql=False,
                        write_disposition='WRITE_TRUNCATE',
                        create_disposition='CREATE_IF_NEEDED',
                        query_params={'t_name': table_name},
                        dag=dag
                        )
start >> bq_query >> end

So what I did is that I created a dynamic table name variable and passed to the BQ operator.

like image 82
gruby Avatar answered Nov 16 '22 18:11

gruby