Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to remove duplicates from a spark data frame while retaining the latest?

Tags:

I'm using spark to load json files from Amazon S3. I would like to remove duplicates based on two columns of the data frame retaining the newest(I have timestamp column). What would be the best way to do it? Please note that the duplicates may be spread across partitions. Can I remove duplicates retaining the last record without shuffling? I'm dealing with 1 TB of data.

I was thinking of partitioning the data frame by those two columns in such way that all duplicate records will be "consistently hashed" into the same partition and thus a partition level sort followed be drop duplicates will eliminate all duplicates keeping just one. I dunno if it's possible. Any information is appreciated.

like image 248
lalatnayak Avatar asked Apr 12 '19 22:04

lalatnayak


People also ask

How do I remove duplicates in a DataFrame spark?

Duplicate rows could be remove or drop from Spark SQL DataFrame using distinct() and dropDuplicates() functions, distinct() can be used to remove rows that have the same values on all columns whereas dropDuplicates() can be used to remove rows that have the same values on multiple selected columns.

How do I drop duplicates in PySpark keep last?

Drop duplicate rows in pyspark by a specific column:dropDuplicates() takes the column name as argument and removes duplicate value of that particular column thereby distinct value of column is obtained.


1 Answers

Use row_number() Window function is probably easier for your task, below c1 is the timestamp column, c2, c3 are columns used to partition your data:

from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# set rn with F.row_number() and filter the result by rn == 1
df_new = df.withColumn('rn', F.row_number().over(win)).where('rn = 1').drop('rn')
df_new.show()

Edit:

If you just need the duplicates and drop unique rows, then add another field:

from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# window to cover all rows in the same partition
win2 = Window.partitionBy('c2', 'c3') \
             .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# set new columns: rn, cnt and filter the result by rn == 1 and cnt > 1
df_new = df.withColumn('rn', F.row_number().over(win)) \
           .withColumn('cnt', F.count('c1').over(win2)) \
           .where('rn = 1 and cnt > 1') \
           .drop('rn', 'cnt')
df_new.show()
like image 198
jxc Avatar answered Oct 05 '22 17:10

jxc