Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to compare two versions of delta table to get changes similar to CDC?

Tags:

delta-lake

If I want to use delta time-travel to compare two versions to get changes similar to CDC, how to do that?

I can see two options:

  1. in SQL you have EXCEPT/MINUS query where you compare all data with another table. I would assume you can also use that, correct? But is that fast enough if you the versions you compare getting bigger and bigger and you always need to compare all against all rows of the latest version?

  2. Is Delta making some kind of hash per row and can do that very fast, or is that very time consuming for delta?


Found on slack

like image 212
Jacek Laskowski Avatar asked Jan 04 '20 14:01

Jacek Laskowski


People also ask

How do you implement SCD Type 2 in Databricks?

These operations require updating the existing rows to mark the previous values of the keys as old and then inserting new rows as the latest values. Also, Given a source table with the updates and the target table with dimensional data, SCD Type 2 can be expressed with the merge.

Can we update a Delta table?

Update a table. You can update data that matches a predicate in a Delta table. For example, to fix a spelling mistake in the eventType , you can run the following: Scala.

How do I compare two Dataframes in Databricks?

let df1 and df2 are two dataframes. df1 has column (A,B,C) and df2 has columns (D,C,B), then you can create a new dataframe which would be the intersection of df1 and df2 conditioned on column B and C. df3 will contain only those rows where the above condition is satisfied from df1 and df2.


1 Answers

You can compute the difference of two versions of the table, but as you guessed it’s expensive to do. it’s also tricky to compute the actual difference when the delta table has changes other than appends.

usually when people ask about this, they’re trying to design their own system that gives them exactly one processing of data from delta to somewhere; spark streaming + Delta source already exists to do this

if you do want to write your own, you can read the transaction log directly (protocol spec is at https://github.com/delta-io/delta/blob/master/PROTOCOL.md) and use the actions in the versions between the two you’re computing to figure out which files have changes to read


Please note that versions of a delta table are cached (persisted by Spark) so comparing different datasets should be fairly cheap.

val v0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/t2")
val v1 = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta/t2")
// v0 and v1 are persisted - see Storage tab in web UI

Getting those v0 and v1 isn’t expensive; comparing the two can be both expensive and tricky. If the table is append-only then it’s (v1 - v0); if it’s got upserts then you have to handle (v0 - v1) as well, and if it’s got metadata or protocol changes it gets even trickier.

And when you do all that logic yourself it’s suspiciously similar to re-implementing DeltaSource.


You may then consider the following:

val log = DeltaLog.forTable(spark, "/tmp/delta/t2")
val v0 = log.getSnapshotAt(0)
val actionsAtV0 = v0.state

val v1 = log.getSnapshotAt(1)
val actionsAtV1 = v1.state

actionsAtV0 and actionsAtV1 are all the actions that brought the delta table to versions 0 and 1, respectively, and can be considered a CDC of the delta table.

That's basically reading the transaction log, except using some Delta’s internal APIs to make that easier.

like image 53
Jacek Laskowski Avatar answered Jan 02 '23 23:01

Jacek Laskowski