Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Delta merge logic whenMatchedDelete case

I'm working on the delta merge logic and wanted to delete a row on the delta table when the row gets deleted on the latest dataframe read.

My sample DF as shown below

df = spark.createDataFrame(
[
    ('Java', "20000"),  # create your data here, be consistent in the types.
  ('PHP', '40000'),
  ('Scala', '50000'),
  ('Python', '10000')
],
["language", "users_count"]  # add your column names here
)

Insert the data to a delta table

df.write.format("delta").mode("append").saveAsTable("xx.delta_merge_check")

On the next read, i've removed the row that shows ('python', '10000'), and now I want to delete this row from the delta table using delta merge API.

df_latest = spark.createDataFrame(
[
    ('Java', "20000"),  # create your data here, be consistent in the types.
  ('PHP', '40000'),
  ('Scala', '50000')
],
["language", "users_count"]  # add your column names here
)

I'm using the below code for the delta merge API

Read the existing delta table:

from delta.tables import *
test_delta = DeltaTable.forPath(spark, 
"wasbs://[email protected]/hive/warehouse/xx/delta_merge_check")

merge the changes:

test_delta.alias("t").merge(df_latest.alias("s"),
"s.language = t.language").whenMatchedDelete(condition = "s.language = 
true").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

But unfortunately this doesn't delete the row('python', '10000') from the delta table, is there any other way to achieve this any help would be much appreciated.

like image 718
chaitra k Avatar asked Oct 27 '25 12:10

chaitra k


1 Answers

This won't work the way you think it should - the basic problem is that your 2nd dataset doesn't have any information that data was deleted, so you somehow need to add this information to it. There are different approaches, based on the specific details:

  • Instead of just removing the row, you keep it, but add an another column that will show if data is deleted or not, something like this:
test_delta.alias("t").merge(df_latest.alias("s"),
"s.language = t.language").whenMatchedDelete(condition = "s.is_deleted = 
true").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
  • use some other method to find diff between your destination table and input data - but this will really depends on your logic. If you able to calculate the diff, then you can use the approach that I described in the previous item.

  • If your input data is always a full set of data, you can just overwrite all data using overwrite mode - this will be even more performant than merge, because you don't rewrite the data

like image 171
Alex Ott Avatar answered Oct 29 '25 07:10

Alex Ott