I'm working on the delta merge logic and wanted to delete a row on the delta table when the row gets deleted on the latest dataframe read.
My sample DF as shown below
df = spark.createDataFrame(
[
    ('Java', "20000"),  # create your data here, be consistent in the types.
  ('PHP', '40000'),
  ('Scala', '50000'),
  ('Python', '10000')
],
["language", "users_count"]  # add your column names here
)
Insert the data to a delta table
df.write.format("delta").mode("append").saveAsTable("xx.delta_merge_check")
On the next read, i've removed the row that shows ('python', '10000'), and now I want to delete this row from the delta table using delta merge API.
df_latest = spark.createDataFrame(
[
    ('Java', "20000"),  # create your data here, be consistent in the types.
  ('PHP', '40000'),
  ('Scala', '50000')
],
["language", "users_count"]  # add your column names here
)
I'm using the below code for the delta merge API
Read the existing delta table:
from delta.tables import *
test_delta = DeltaTable.forPath(spark, 
"wasbs://[email protected]/hive/warehouse/xx/delta_merge_check")
merge the changes:
test_delta.alias("t").merge(df_latest.alias("s"),
"s.language = t.language").whenMatchedDelete(condition = "s.language = 
true").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
But unfortunately this doesn't delete the row('python', '10000') from the delta table, is there any other way to achieve this any help would be much appreciated.
This won't work the way you think it should - the basic problem is that your 2nd dataset doesn't have any information that data was deleted, so you somehow need to add this information to it. There are different approaches, based on the specific details:
test_delta.alias("t").merge(df_latest.alias("s"),
"s.language = t.language").whenMatchedDelete(condition = "s.is_deleted = 
true").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
use some other method to find diff between your destination table and input data - but this will really depends on your logic. If you able to calculate the diff, then you can use the approach that I described in the previous item.
If your input data is always a full set of data, you can just overwrite all data using overwrite mode - this will be even more performant than merge, because you don't rewrite the data
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With