The Problem
We have a Delta Lake setup on top of ADLS Gen2 with the following tables:
bronze.DeviceData
: partitioned by arrival date (Partition_Date
)silver.DeviceData
: partitioned by event date and hour (Partition_Date
and Partition_Hour
)We ingest large amounts of data (>600M records per day) from an event hub into bronze.DeviceData
(append-only). We then process the new files in a streaming fashion and upsert them into silver.DeviceData
with the delta MERGE command (see below).
The data arriving in the bronze table can contain data from any partition in silver (e.g. a device may send historic data that it cached locally). However, >90% of the data arriving at any day is from partitions Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS)
. Therefore, to upsert the data, we have the following two spark jobs:
Now we come to the problem: although the amount of data is magnitudes less in the "slow" job, it runs for days just to process a single day of slow bronze data, with a big cluster. The reason is simple: it has to read and update many silver partitions (> 1000 date partitions at times), and since the updates are small but the date partitions can be gigabytes, these merge commands are inefficient.
Furthermore, as time goes on, this slow job will become slower and slower, since the silver partitions it touches will grow.
Questions
Additional Infos
CREATE TABLE silver.DeviceData (
DeviceID LONG NOT NULL, -- the ID of the device that sent the data
DataType STRING NOT NULL, -- the type of data it sent
Timestamp TIMESTAMP NOT NULL, -- the timestamp of the data point
Value DOUBLE NOT NULL, -- the value that the device sent
UpdatedTimestamp TIMESTAMP NOT NULL, -- the timestamp when the value arrived in bronze
Partition_Date DATE NOT NULL, -- = TO_DATE(Timestamp)
Partition_Hour INT NOT NULL -- = HOUR(Timestamp)
)
USING DELTA
PARTITIONED BY (Partition_Date, Partition_Hour)
LOCATION '...'
val silverTable = DeltaTable.forPath(spark, silverDeltaLakeDirectory)
val batch = ... // the streaming update batch
// the dates and hours that we want to upsert, for partition pruning
// collected from the streaming update batch
val dates = "..."
val hours = "..."
val mergeCondition = s"""
silver.Partition_Date IN ($dates)
AND silver.Partition_Hour IN ($hours)
AND silver.Partition_Date = batch.Partition_Date
AND silver.Partition_Hour = batch.Partition_Hour
AND silver.DeviceID = batch.DeviceID
AND silver.Timestamp = batch.Timestamp
AND silver.DataType = batch.DataType
"""
silverTable.alias("silver")
.merge(batch.alias("batch"), mergeCondition)
// only merge if the event is newer
.whenMatched("batch.UpdatedTimestamp > silver.UpdatedTimestamp").updateAll
.whenNotMatched.insertAll
.execute
On Databricks, there are several ways to optimize performance of the merge into
operation:
OPTIMIZE
creates files of 1Gb, that need to be rewritten. You can use spark.databricks.delta.optimize.maxFileSize
to set file size to 32Mb-64Mb range so it will rewrite less dataThis presentation from Spark Summit talks about optimization of the merge into
- what metrics to watch, etc.
I'm not 100% sure that you need condition silver.Partition_Date IN ($dates) AND silver.Partition_Hour IN ($hours)
because you may read more data than required if you don't have specific partitions in the incoming data, but it will require to look into the execution plan. This knowledge base article explains how to make sure that merge into
uses the partition pruning.
Update, December 2021st: In newer DBR versions (DBR 9+) there is a new functionality called Low Shuffle Merge that prevents shuffling of not modified data, so the merge happens much faster. It could be enabled by setting spark.databricks.delta.merge.enableLowShuffle
to true
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With