Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Processing upserts on a large number of partitions is not fast enough

The Problem

We have a Delta Lake setup on top of ADLS Gen2 with the following tables:

  • bronze.DeviceData: partitioned by arrival date (Partition_Date)
  • silver.DeviceData: partitioned by event date and hour (Partition_Date and Partition_Hour)

We ingest large amounts of data (>600M records per day) from an event hub into bronze.DeviceData (append-only). We then process the new files in a streaming fashion and upsert them into silver.DeviceData with the delta MERGE command (see below).

The data arriving in the bronze table can contain data from any partition in silver (e.g. a device may send historic data that it cached locally). However, >90% of the data arriving at any day is from partitions Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS). Therefore, to upsert the data, we have the following two spark jobs:

  • "Fast": processes the data from the three date partitions above. The latency is important here, so we prioritize this data
  • "Slow": processes the rest (anything but these three date partitions). The latency doesn't matter so much, but it should be within a "reasonable" amount of time (not more than a week I'd say)

Now we come to the problem: although the amount of data is magnitudes less in the "slow" job, it runs for days just to process a single day of slow bronze data, with a big cluster. The reason is simple: it has to read and update many silver partitions (> 1000 date partitions at times), and since the updates are small but the date partitions can be gigabytes, these merge commands are inefficient.

Furthermore, as time goes on, this slow job will become slower and slower, since the silver partitions it touches will grow.

Questions

  1. Is our partitioning scheme and the fast/slow Spark job setup generally a good way to approach this problem?
  2. What could be done to improve this setup? We would like to reduce the costs and the latency of the slow job and find a way so that it grows with the amount of data arriving at any day in bronze rather than with the size of the silver table

Additional Infos

  • we need the MERGE command, as certain upstream services can re-process historic data, which should then update the silver table as well
  • the schema of the silver table:
CREATE TABLE silver.DeviceData (
  DeviceID LONG NOT NULL, -- the ID of the device that sent the data
  DataType STRING NOT NULL, -- the type of data it sent
  Timestamp TIMESTAMP NOT NULL, -- the timestamp of the data point
  Value DOUBLE NOT NULL, -- the value that the device sent
  UpdatedTimestamp TIMESTAMP NOT NULL, -- the timestamp when the value arrived in bronze
  Partition_Date DATE NOT NULL, -- = TO_DATE(Timestamp)
  Partition_Hour INT NOT NULL -- = HOUR(Timestamp)
)
USING DELTA
PARTITIONED BY (Partition_Date, Partition_Hour)
LOCATION '...'
  • our MERGE command:
val silverTable = DeltaTable.forPath(spark, silverDeltaLakeDirectory)

val batch = ... // the streaming update batch

// the dates and hours that we want to upsert, for partition pruning
// collected from the streaming update batch
val dates = "..."
val hours = "..."

val mergeCondition = s"""
  silver.Partition_Date IN ($dates)
  AND silver.Partition_Hour IN ($hours)
  AND silver.Partition_Date = batch.Partition_Date
  AND silver.Partition_Hour = batch.Partition_Hour
  AND silver.DeviceID = batch.DeviceID
  AND silver.Timestamp = batch.Timestamp
  AND silver.DataType = batch.DataType
"""

silverTable.alias("silver")
  .merge(batch.alias("batch"), mergeCondition)
  // only merge if the event is newer
  .whenMatched("batch.UpdatedTimestamp > silver.UpdatedTimestamp").updateAll
  .whenNotMatched.insertAll
  .execute
like image 719
hbrgnr Avatar asked Mar 16 '21 16:03

hbrgnr


1 Answers

On Databricks, there are several ways to optimize performance of the merge into operation:

  • Perform Optimize with ZOrder on the columns that are part of the join condition. This may depend on the specific DBR version, as older versions (prior to 7.6 IIRC) were using real ZOrder algorithm that is working well for smaller number of columns, while DBR 7.6+ uses by default Hilbert space-filling curves instead
  • Use smaller file sizes - by default, OPTIMIZE creates files of 1Gb, that need to be rewritten. You can use spark.databricks.delta.optimize.maxFileSize to set file size to 32Mb-64Mb range so it will rewrite less data
  • Use conditions on partitions of the table (you're already doing that)
  • Don't use auto-compaction because it can't do ZOrder, but instead run explicit optimize with ZOrder. See documentation on details
  • Tune indexing of the columns, so it will index only columns that are required for your condition and queries. It's partially related to the merging, but can slightly improve write speed because no statistics will be collected for columns that aren't used for queries.

This presentation from Spark Summit talks about optimization of the merge into - what metrics to watch, etc.

I'm not 100% sure that you need condition silver.Partition_Date IN ($dates) AND silver.Partition_Hour IN ($hours) because you may read more data than required if you don't have specific partitions in the incoming data, but it will require to look into the execution plan. This knowledge base article explains how to make sure that merge into uses the partition pruning.

Update, December 2021st: In newer DBR versions (DBR 9+) there is a new functionality called Low Shuffle Merge that prevents shuffling of not modified data, so the merge happens much faster. It could be enabled by setting spark.databricks.delta.merge.enableLowShuffle to true.

like image 142
Alex Ott Avatar answered Oct 16 '22 06:10

Alex Ott