Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Structured streaming custom deduplication

I have a streaming data coming in from kafka into dataFrame. I want to remove duplicates based in Id and keep the latest records based on timestamp.

Sample data is like this :

Id  Name    count   timestamp
1   Vikas   20      2018-09-19T10:10:10
2   Vijay   50      2018-09-19T10:10:20
3   Vilas   30      2018-09-19T10:10:30
4   Vishal  10      2018-09-19T10:10:40
1   Vikas   50      2018-09-19T10:10:50
4   Vishal  40      2018-09-19T10:11:00
1   Vikas   10      2018-09-19T10:11:10
3   Vilas   20      2018-09-19T10:11:20

The output that I am expecting would be :

Id  Name    count   timestamp
1   Vikas   10      2018-09-19T10:11:10
2   Vijay   50      2018-09-19T10:10:20
3   Vilas   20      2018-09-19T10:11:20
4   Vishal  40      2018-09-19T10:11:00

Older duplicates are removed and only the recent records are kept based on the timestamp field.

I am using watermarking for timestamp field. I have tried using "df.removeDuplicate" but it keeps older records intact and anything new gets discarded.

Current code is as follows :

df = df.withWatermark("timestamp", "1 Day").dropDuplicates("Id", "timestamp")

How can we implement custom dedup method so that we can keep latest record as unique record?

Any help is appreciated.

like image 517
Vikas Gite Avatar asked Mar 05 '26 22:03

Vikas Gite


1 Answers

Sort the timestamp column first before dropping the duplicates.

df.withWatermark("timestamp", "1 Day")
  .sort($"timestamp".desc)
  .dropDuplicates("Id", "timestamp")
like image 149
bp2010 Avatar answered Mar 07 '26 16:03

bp2010



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!