I have a streaming data coming in from kafka into dataFrame. I want to remove duplicates based in Id and keep the latest records based on timestamp.
Sample data is like this :
Id Name count timestamp
1 Vikas 20 2018-09-19T10:10:10
2 Vijay 50 2018-09-19T10:10:20
3 Vilas 30 2018-09-19T10:10:30
4 Vishal 10 2018-09-19T10:10:40
1 Vikas 50 2018-09-19T10:10:50
4 Vishal 40 2018-09-19T10:11:00
1 Vikas 10 2018-09-19T10:11:10
3 Vilas 20 2018-09-19T10:11:20
The output that I am expecting would be :
Id Name count timestamp
1 Vikas 10 2018-09-19T10:11:10
2 Vijay 50 2018-09-19T10:10:20
3 Vilas 20 2018-09-19T10:11:20
4 Vishal 40 2018-09-19T10:11:00
Older duplicates are removed and only the recent records are kept based on the timestamp field.
I am using watermarking for timestamp field. I have tried using "df.removeDuplicate" but it keeps older records intact and anything new gets discarded.
Current code is as follows :
df = df.withWatermark("timestamp", "1 Day").dropDuplicates("Id", "timestamp")
How can we implement custom dedup method so that we can keep latest record as unique record?
Any help is appreciated.
Sort the timestamp column first before dropping the duplicates.
df.withWatermark("timestamp", "1 Day")
.sort($"timestamp".desc)
.dropDuplicates("Id", "timestamp")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With