Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Spark Dataframe how to get duplicate records and distinct records in two dataframes?

I am working on a problem in which I am loading data from a hive table into spark dataframe and now I want all the unique accts in 1 dataframe and all duplicates in another. for example if I have acct id 1,1,2,3,4. I want to get 2,3,4 in one dataframe and 1,1 in another. How can I do this?

like image 224
Shekhar Avatar asked Oct 13 '16 16:10

Shekhar


People also ask

How do I find duplicate rows in spark data frame?

➠ Find complete row duplicates: GroupBy can be used along with count() aggregate function on all the columns (using df. ➠ Find column level duplicates: GroupBy with required columns can be used along with count() aggregate function and then filter can be used to get duplicate records.

How can I find the difference between two spark data frames?

Use the except() to subtract or find the difference between two dataframes.

How do I check for duplicates in spark?

You can count the number of distinct rows on a set of columns and compare it with the number of total rows. If they are the same, there is no duplicate rows. If the number of distinct rows is less than the total number of rows, duplicates exist.


2 Answers

val acctDF = List(("1", "Acc1"), ("1", "Acc1"), ("1", "Acc1"), ("2", "Acc2"), ("2", "Acc2"), ("3", "Acc3")).toDF("AcctId", "Details")
scala> acctDF.show()
+------+-------+
|AcctId|Details|
+------+-------+
|     1|   Acc1|
|     1|   Acc1|
|     1|   Acc1|
|     2|   Acc2|
|     2|   Acc2|
|     3|   Acc3|
+------+-------+
// Need to convert the DF to rdd to apply map and reduceByKey and again to DF to use it further more

val countsDF = acctDF.rdd.map(rec => (rec(0), 1)).reduceByKey(_+_).map(rec=> (rec._1.toString, rec._2)).toDF("AcctId", "AcctCount")

val accJoinedDF = acctDF.join(countsDF, acctDF("AcctId")===countsDF("AcctId"), "left_outer").select(acctDF("AcctId"), acctDF("Details"), countsDF("AcctCount"))

scala> accJoinedDF.show()
+------+-------+---------+   
|AcctId|Details|AcctCount|
+------+-------+---------+
|     1|   Acc1|        3|
|     1|   Acc1|        3|
|     1|   Acc1|        3|
|     2|   Acc2|        2|
|     2|   Acc2|        2|
|     3|   Acc3|        1|
+------+-------+---------+


val distAcctDF = accJoinedDF.filter($"AcctCount"===1)
scala> distAcctDF.show()
+------+-------+---------+   
|AcctId|Details|AcctCount|
+------+-------+---------+
|     3|   Acc3|        1|
+------+-------+---------+

val duplAcctDF = accJoinedDF.filter($"AcctCount">1)
scala> duplAcctDF.show()
+------+-------+---------+                 
|AcctId|Details|AcctCount|
+------+-------+---------+
|     1|   Acc1|        3|
|     1|   Acc1|        3|
|     1|   Acc1|        3|
|     2|   Acc2|        2|
|     2|   Acc2|        2|
+------+-------+---------+

(OR scala> duplAcctDF.distinct.show() )
like image 177
KiranM Avatar answered Sep 24 '22 15:09

KiranM


Depending on the version of spark you have, you could use window functions in datasets/sql like below:

Dataset<Row> New = df.withColumn("Duplicate", count("*").over( Window.partitionBy("id") ) );

Dataset<Row> Dups = New.filter(col("Duplicate").gt(1));

Dataset<Row> Uniques = New.filter(col("Duplicate").equalTo(1));

the above is written in java. should be similar in scala and read this on how to do in python. https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

like image 35
valearner Avatar answered Sep 24 '22 15:09

valearner