Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

BigQuery - Time Series and most efficient way to select the 'latest' record

In our BQ design we have a customer table (nested raw data) which is event sourced from our microservices layer (consuming of kinesis steams), where each event has the latest entity snapshot for the entity the event is for (post processing image after the change). A sort of modern change data capture I guess.

This latest snapshot in each event is how we populate BigQuery - it is extracted and loaded into biqQuery (via apache spark structured steaming connector) in an APPEND ONLY mode. (This is different to say mutating and updating one row for a given ID)

So given this is append only, the size of this table can grow of course over time - an entry per change from an event. However it quite nicely is a full, timeseries of customer state and changes (our requirement to have), and is immutable as such. We can rebuild the full warehouse by replaying the events for example....enough on the context.

One consequence of this is the fact that loading into BigQuery may result in duplicates (e.g if spark error and retries a micro batch, BQ isnt an idempotent structured streaming sink when loading by jobs, or just due to distributed nature its generally possible). SteamingInserts might be something to look into later as it helps with deduping.....

The result of this architecture is that I need a view ontop of the raw time series data (remember can occasionally have duplicates) that returns the LATEST record under these conditions.

Latest is determined by a metadata struct field on the customer record (metadata.lastUpdated) - and the row with the MAX(metadata.lastUpdated) is the latest. This is guarneteed by our MS layer.

This is a true event time timestamp as well. The table id DAY partitioned and has a _PARTITIONTIME column, but this is just an ingest time and I cant use this. Be great when I can specify a column to be used as the partition time! (wishlist).

A duplicate will be two rows with the SAME customer 'id' AND 'metadata.lastUpdated' - so MAX(metadata.lastUpdated) could return 2 rows, so I need to use

ROW_NUMBER() OVER (PARTITION BY .... so I can select the rowNum=1

In my view as well to only select 1 row where there is dups.

Ok so enough words/context (sorry), below is my view SQL to get the latest. It works from my tests, but I am not sure it is the most efficient way to achieve my outcome when the size of the table / number of rows gets large, and was wondering if any BigQuery boffins out there might have a more efficient / clever SQL to do this? Why SQL is OK, but by no means an expert in performance tuning for sure and in particular the best ways to do SQL for BQ perf tunning.

I was just hoping to be able to have all the data in one table and rely on the power of the dremel engine to just query it, rather then needing to have multiple tables or do anyting too complex.

SO my SQL is below. Note - my timestamp is ingested as a string, so need to PARSE this in the view too.

WITH
  cus_latest_watermark AS (
  SELECT
    id,
    MAX(PARSE_TIMESTAMP("%Y-%m-%dT%H:%M:%E*S%Ez", metadata.lastUpdated)) AS maxLastUpdatedTimestampUTC
  FROM
    `project.dataset.customer_refdata`
  GROUP BY
    id ),
  cust_latest_rec_dup AS (
  SELECT
    cus.*,
    ROW_NUMBER() OVER (PARTITION BY cus.id ORDER BY cus.id) AS rowNum
  FROM
    `project.dataset.customer_refdata` cus
  JOIN
    cus_latest_watermark
  ON
    cus.id = cus_latest_watermark.id
    AND PARSE_TIMESTAMP("%Y-%m-%dT%H:%M:%E*S%Ez", cus.metadata.lastUpdated) = cus_latest_watermark.maxLastUpdatedTimestampUTC)
SELECT
  cust_latest_rec_dup.* EXCEPT(rowNum)
FROM
  cust_latest_rec_dup
WHERE
  rowNum = 1

Thanks!

like image 785
Kurt Maile Avatar asked Jul 14 '17 16:07

Kurt Maile


2 Answers

I'm a big fan of Mikhail, and we've been doing queries like OVER(ORDER) for a long time - but let me propose an alternative made possible thanks to #standardSQL.

This query fails, because it has too many elements to ORDER BY in a single partition:

#standardSQL
SELECT *
FROM (
  SELECT repo.name, type, actor.id as actor, payload, created_at
    , ROW_NUMBER() OVER(PARTITION BY actor.id ORDER BY created_at DESC) rn
  FROM `githubarchive.month.201706` 
)
WHERE rn=1
ORDER BY created_at
LIMIT 100


"Error: Resources exceeded during query execution."

Meanwhile this query runs in 15 seconds:

#standardSQL
SELECT actor, event
FROM (
  SELECT actor.id actor, 
    ARRAY_AGG(
      STRUCT(type, repo.name, payload, created_at) 
      ORDER BY created_at DESC LIMIT 1
    ) events
  FROM `githubarchive.month.201706` 
  GROUP BY 1
), UNNEST(events) event
ORDER BY event.created_at
LIMIT 100

This because the ORDER BY is allowed to drop everything - except the top record - on each GROUP BY.

And if you want all records, the equivalent to SELECT * (thanks Elliott):

#standardSQL
SELECT event.* FROM (
  SELECT ARRAY_AGG(
    t ORDER BY t.created_at DESC LIMIT 1
  )[OFFSET(0)]  event
  FROM `githubarchive.month.201706` t 
  GROUP BY actor.id
)
ORDER BY created_at
LIMIT 100
like image 198
Felipe Hoffa Avatar answered Oct 12 '22 12:10

Felipe Hoffa


Try below for BigQuery Standard SQL

#standardSQL
WITH cus_watermark AS (
  SELECT
    *,
    PARSE_TIMESTAMP("%Y-%m-%dT%H:%M:%E*S%Ez", metadata.lastUpdated) AS UpdatedTimestampUTC
  FROM `project.dataset.customer_refdata`
),
cust_latest_rec_dup AS (
  SELECT 
    *,
    ROW_NUMBER() OVER (PARTITION BY id ORDER BY UpdatedTimestampUTC DESC) AS rowNum
  FROM cus_watermark
)
SELECT * EXCEPT(rowNum)
FROM cust_latest_rec_dup
WHERE rowNum = 1  

You can play/test this approach with below dummy data

#standardSQL
WITH `project.dataset.customer_refdata` AS (
  SELECT 1 AS id, '2017-07-14 16:47:27' AS lastUpdated UNION ALL
  SELECT 1, '2017-07-14 16:47:27' UNION ALL
  SELECT 1, '2017-07-14 17:47:27' UNION ALL
  SELECT 1, '2017-07-14 18:47:27' UNION ALL
  SELECT 2, '2017-07-14 16:57:27' UNION ALL
  SELECT 2, '2017-07-14 17:57:27' UNION ALL
  SELECT 2, '2017-07-14 18:57:27' 
),
cus_watermark AS (
  SELECT
    *,
    PARSE_TIMESTAMP("%Y-%m-%d %T", lastUpdated) AS UpdatedTimestampUTC
  FROM `project.dataset.customer_refdata`
),
cust_latest_rec_dup AS (
  SELECT 
    *,
    ROW_NUMBER() OVER (PARTITION BY id ORDER BY UpdatedTimestampUTC DESC) AS rowNum
  FROM cus_watermark
)
SELECT * EXCEPT(rowNum)
FROM cust_latest_rec_dup
WHERE rowNum = 1
like image 29
Mikhail Berlyant Avatar answered Oct 12 '22 12:10

Mikhail Berlyant