Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to perform UPSERT when loading data from Google Storage to BigQuery?

BigQuery support the following polices:

WRITE_APPEND - Specifies that rows may be appended to an existing table.

WRITE_EMPTY - Specifies that the output table must be empty.

WRITE_TRUNCATE - Specifies that write should replace a table.

None of them fits for the purpose of UPSERT operation.

I'm importing orders Json file to Google Storage and I want to load it into BigQuery. As logic suggests some records will be new while others already exists from previous loads and need to be update (for example update orders status (new /on hold / sent / refund etc...)

I'm using Airflow but my question is generic:

update_bigquery = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    task_id='load_orders_to_BigQuery',
    bucket=GCS_BUCKET_ID,
    destination_project_dataset_table=table_name_template,
    source_format='NEWLINE_DELIMITED_JSON',
    source_objects=[gcs_export_uri_template],
    schema_fields=dc(),
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    skip_leading_rows = 1,
    google_cloud_storage_conn_id=CONNECTION_ID,
    bigquery_conn_id=CONNECTION_ID
)

This code uses the WRITE_TRUNCATE which means that it delete the whole table and load the requested file.

How can I modify it to support UPSERT?

Does my only option is to query the table search for existed orders that appear in the json delete them and then perform the LOAD?

like image 805
Programmer120 Avatar asked Jan 01 '23 23:01

Programmer120


1 Answers

Instead of running a GoogleCloudStorageToBigQueryOperator, you could run a query that would give you the same result as an upsert.

Example from https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement:

MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON T.product = S.product
WHEN MATCHED THEN
  UPDATE SET quantity = T.quantity + S.quantity
WHEN NOT MATCHED THEN
  INSERT (product, quantity) VALUES(product, quantity)

This query will:

  • Take a look at table T (current) and S (updates).
  • If the updates change an existing row, it will run an UPDATE on that row.
  • If the updates have a not-yet existing product, it will INSERT that new row.

Now, how will BigQuery know about your table S? You can either:

  • Load it into BQ into a different table with GoogleCloudStorageToBigQueryOperator.
  • Or you can set up a federated table that looks straight into GCS - which I did in https://medium.com/google-cloud/bigquery-lazy-data-loading-ddl-dml-partitions-and-half-a-trillion-wikipedia-pageviews-cd3eacd657b6
like image 122
Felipe Hoffa Avatar answered Jan 04 '23 11:01

Felipe Hoffa