Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataframe: Select distinct rows

I tried two ways to find distinct rows from parquet but it doesn't seem to work.
Attemp 1: Dataset<Row> df = sqlContext.read().parquet("location.parquet").distinct();
But throws

Cannot have map type columns in DataFrame which calls set operations
(intersect, except, etc.), 
but the type of column canvasHashes is map<string,string>;;

Attemp 2: Tried running sql queries:

Dataset<Row> df = sqlContext.read().parquet("location.parquet");
    rawLandingDS.createOrReplaceTempView("df");
    Dataset<Row> landingDF = sqlContext.sql("SELECT distinct on timestamp * from df");

error I get:

= SQL ==
SELECT distinct on timestamp * from df
-----------------------------^^^

Is there a way to get distinct records while reading parquet files? Any read option I can use.

like image 261
Himanshu Yadav Avatar asked Dec 14 '22 12:12

Himanshu Yadav


2 Answers

The problem you face is explicitly stated in the exception message - because MapType columns are neither hashable nor orderable cannot be used as a part of grouping or partitioning expression.

Your take on SQL solution is not logically equivalent to distinct on Dataset. If you want to deduplicate data based on a set of compatible columns you should use dropDuplicates:

df.dropDuplicates("timestamp")

which would be equivalent to

SELECT timestamp, first(c1) AS c1, first(c2) AS c2,  ..., first(cn) AS cn,
       first(canvasHashes) AS canvasHashes
FROM df GROUP BY timestamp

Unfortunately if your goal is actual DISTINCT it won't be so easy. On possible solution is to leverage Scala* Map hashing. You could define Scala udf like this:

spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)

and then use it in your Java code to derive column that can be used to dropDuplicates:

 df
  .selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
  .dropDuplicates(
    // All columns excluding canvasHashes / hash_of_canvas_hashes
    "timestamp",  "c1", "c2", ..., "cn" 
    // Hash used as surrogate of canvasHashes
    "hash_of_canvas_hashes"         
  )

with SQL equivalent

SELECT 
  timestamp, c1, c2, ..., cn,   -- All columns excluding canvasHashes
  first(canvasHashes) AS canvasHashes
FROM df GROUP BY
  timestamp, c1, c2, ..., cn    -- All columns excluding canvasHashes

* Please note that java.util.Map with its hashCode won't work, as hashCode is not consistent.

like image 116
user10938362 Avatar answered Dec 17 '22 23:12

user10938362


1) If you want to distinct based on coluns you can use it

val df = sc.parallelize(Array((1, 2), (3, 4), (1, 6))).toDF("no", "age")


scala> df.show
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+

val distinctValuesDF = df.select(df("no")).distinct

scala> distinctValuesDF.show
+---+
| no|
+---+
|  1|
|  3|
+---+

2) If you have want unique on all column use dropduplicate

scala> val df = sc.parallelize(Array((1, 2), (3, 4),(3, 4), (1, 6))).toDF("no", "age")



scala> df.show

+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  3|  4|
|  1|  6|
+---+---+


scala> df.dropDuplicates().show()
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+
like image 29
vaquar khan Avatar answered Dec 18 '22 00:12

vaquar khan