If I want to use delta time-travel to compare two versions to get changes similar to CDC, how to do that?
I can see two options:
in SQL you have EXCEPT/MINUS query where you compare all data with another table. I would assume you can also use that, correct? But is that fast enough if you the versions you compare getting bigger and bigger and you always need to compare all against all rows of the latest version?
Is Delta making some kind of hash per row and can do that very fast, or is that very time consuming for delta?
Found on slack
These operations require updating the existing rows to mark the previous values of the keys as old and then inserting new rows as the latest values. Also, Given a source table with the updates and the target table with dimensional data, SCD Type 2 can be expressed with the merge.
Update a table. You can update data that matches a predicate in a Delta table. For example, to fix a spelling mistake in the eventType , you can run the following: Scala.
let df1 and df2 are two dataframes. df1 has column (A,B,C) and df2 has columns (D,C,B), then you can create a new dataframe which would be the intersection of df1 and df2 conditioned on column B and C. df3 will contain only those rows where the above condition is satisfied from df1 and df2.
You can compute the difference of two versions of the table, but as you guessed it’s expensive to do. it’s also tricky to compute the actual difference when the delta table has changes other than appends.
usually when people ask about this, they’re trying to design their own system that gives them exactly one processing of data from delta to somewhere; spark streaming + Delta source already exists to do this
if you do want to write your own, you can read the transaction log directly (protocol spec is at https://github.com/delta-io/delta/blob/master/PROTOCOL.md) and use the actions in the versions between the two you’re computing to figure out which files have changes to read
Please note that versions of a delta table are cached (persisted by Spark) so comparing different datasets should be fairly cheap.
val v0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/t2")
val v1 = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta/t2")
// v0 and v1 are persisted - see Storage tab in web UI
Getting those v0 and v1 isn’t expensive; comparing the two can be both expensive and tricky. If the table is append-only then it’s (v1 - v0); if it’s got upserts then you have to handle (v0 - v1) as well, and if it’s got metadata or protocol changes it gets even trickier.
And when you do all that logic yourself it’s suspiciously similar to re-implementing DeltaSource.
You may then consider the following:
val log = DeltaLog.forTable(spark, "/tmp/delta/t2")
val v0 = log.getSnapshotAt(0)
val actionsAtV0 = v0.state
val v1 = log.getSnapshotAt(1)
val actionsAtV1 = v1.state
actionsAtV0
and actionsAtV1
are all the actions that brought the delta table to versions 0 and 1, respectively, and can be considered a CDC of the delta table.
That's basically reading the transaction log, except using some Delta’s internal APIs to make that easier.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With