Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark: How to do a dropDuplicates on a dataframe while keeping the highest timestamped row [duplicate]

I have a use case where I'd need to drop duplicate rows of a dataframe (in this case duplicate means they have the same 'id' field) while keeping the row with the highest 'timestamp' (unix timestamp) field.

I found the drop_duplicate method (I'm using pyspark), but one don't have control on which item will be kept.

Anyone can help ? Thx in advance

like image 565
arnaud briche Avatar asked Apr 14 '16 12:04

arnaud briche


2 Answers

A manual map and reduce might be needed to provide the functionality you want.

def selectRowByTimeStamp(x,y):
    if x.timestamp > y.timestamp:
        return x
    return y

dataMap = data.map(lambda x: (x.id, x))
uniqueData = dataMap.reduceByKey(selectRowByTimeStamp) 

Here we are grouping all of the data based on id. Then, when we are reducing the groupings, we do so by keeping the record with the highest timestamp. When the code is done reducing, only 1 record will be left for each id.

like image 148
David Avatar answered Feb 12 '23 02:02

David


You can do something like this:

val df = Seq(
  (1,12345678,"this is a test"),
  (1,23456789, "another test"),
  (2,2345678,"2nd test"),
  (2,1234567, "2nd another test")
).toDF("id","timestamp","data")

+---+---------+----------------+
| id|timestamp|            data|
+---+---------+----------------+
|  1| 12345678|  this is a test|
|  1| 23456789|    another test|
|  2|  2345678|        2nd test|
|  2|  1234567|2nd another test|
+---+---------+----------------+

df.join(
  df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
  $"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show
+---+---------+------------+
| id|timestamp|        data|
+---+---------+------------+
|  1| 23456789|another test|
|  2|  2345678|    2nd test|
+---+---------+------------+

If you expect there could be a repeated timestamp for an id (see comments below), you could do this:

df.dropDuplicates(Seq("id", "timestamp")).join(
  df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
  $"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show
like image 33
David Griffin Avatar answered Feb 12 '23 01:02

David Griffin