I have a simple DAG
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
with DAG(dag_id='my_dags.my_dag') as dag:
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
sql = """
SELECT *
FROM 'another_dataset.another_table'
"""
bq_query = BigQueryOperator(bql=sql,
destination_dataset_table='my_dataset.my_table20180524'),
task_id='bq_query',
bigquery_conn_id='my_bq_connection',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
query_params={})
start >> bq_query >> end
When executing the bq_query
task the SQL query gets saved in a sharded table. I want it to get saved in a daily partitioned table. In order to do so, I only changed destination_dataset_table
to my_dataset.my_table$20180524
. I got the error below when executing the bq_task
:
Partitioning specification must be provided in order to create partitioned table
How can I specify to BigQuery to save query result to a daily partitioned table ? my first guess has been to use query_params
in BigQueryOperator
but I didn't find any example on how to use that parameter.
EDIT:
I'm using google-cloud==0.27.0
python client ... and it's the one used in Prod :(
With Airflow BigQuery Operators, you can also validate your data and check whether or not the SQL query executed has returned the valid data. In this case, you can use the “BigQueryCheckOperator” operator. Given below is the syntax of this operator:
Using BigQueryOperator you can pass time_partitioning parameter which will create ingestion-time partitioned tables
You could use it if you need to provide some params that are not supported by BigQueryOperator like args. schema_update_options ( tuple) – Allows the schema of the destination table to be updated as a side effect of the load job. query_params ( list) – a list of dictionary containing query parameter types and values, passed to BigQuery.
Helper class for constructing BigQuery link. Executes BigQuery SQL queries in a specific BigQuery database bql ( Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql'.) – (Deprecated.
You first need to create an Empty partitioned destination table. Follow instructions here: link to create an empty partitioned table
and then run below airflow pipeline again. You can try code:
import datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
today_date = datetime.datetime.now().strftime("%Y%m%d")
table_name = 'my_dataset.my_table' + '$' + today_date
with DAG(dag_id='my_dags.my_dag') as dag:
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
sql = """
SELECT *
FROM 'another_dataset.another_table'
"""
bq_query = BigQueryOperator(bql=sql,
destination_dataset_table={{ params.t_name }}),
task_id='bq_query',
bigquery_conn_id='my_bq_connection',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
query_params={'t_name': table_name},
dag=dag
)
start >> bq_query >> end
So what I did is that I created a dynamic table name variable and passed to the BQ operator.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With