Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark remove duplicate rows from DataFrame [duplicate]

Assume that I am having a DataFrame like :

val json = sc.parallelize(Seq("""{"a":1, "b":2, "c":22, "d":34}""","""{"a":3, "b":9, "c":22, "d":12}""","""{"a":1, "b":4, "c":23, "d":12}"""))
val df = sqlContext.read.json(json)

I want to remove duplicate rows for column "a" based on the value of column "b". i.e, if there are duplicate rows for column "a", I want to keep the one with larger value for "b". For the above example, after processing, I need only

{"a":3, "b":9, "c":22, "d":12}

and

{"a":1, "b":4, "c":23, "d":12}

Spark DataFrame dropDuplicates API doesn't seem to support this. With the RDD approach, I can do a map().reduceByKey(), but what DataFrame specific operation is there to do this?

Appreciate some help, thanks.

like image 268
void Avatar asked Feb 19 '16 05:02

void


People also ask

How do I remove duplicate rows in PySpark?

PySpark distinct() function is used to drop/remove the duplicate rows (all columns) from DataFrame and dropDuplicates() is used to drop rows based on selected (one or multiple) columns.

How does dropDuplicates work in spark?

Return a new DataFrame with duplicate rows removed, optionally only considering certain columns. For a static batch DataFrame , it just drops duplicate rows. For a streaming DataFrame , it will keep all data across triggers as intermediate state to drop duplicates rows.

How do I find duplicate rows in spark data frame?

➠ Find complete row duplicates: GroupBy can be used along with count() aggregate function on all the columns (using df. ➠ Find column level duplicates: GroupBy with required columns can be used along with count() aggregate function and then filter can be used to get duplicate records.

How do I remove duplicates from a data frame?

Remove All Duplicate Rows from Pandas DataFrame You can set 'keep=False' in the drop_duplicates() function to remove all the duplicate rows. For E.x, df. drop_duplicates(keep=False) .


1 Answers

You can use window function in sparksql to achieve this.

df.registerTempTable("x")
sqlContext.sql("SELECT a, b,c,d  FROM( SELECT *, ROW_NUMBER()OVER(PARTITION BY a ORDER BY b DESC) rn FROM x) y WHERE rn = 1").collect

This will achieve what you need. Read more about Window function suupport https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

like image 162
Pankaj Arora Avatar answered Nov 10 '22 18:11

Pankaj Arora