I tried two ways to find distinct rows from parquet but it doesn't seem to work.
Attemp 1:
Dataset<Row> df = sqlContext.read().parquet("location.parquet").distinct();
But throws
Cannot have map type columns in DataFrame which calls set operations
(intersect, except, etc.), 
but the type of column canvasHashes is map<string,string>;;
Attemp 2: Tried running sql queries:
Dataset<Row> df = sqlContext.read().parquet("location.parquet");
    rawLandingDS.createOrReplaceTempView("df");
    Dataset<Row> landingDF = sqlContext.sql("SELECT distinct on timestamp * from df");
error I get:
= SQL ==
SELECT distinct on timestamp * from df
-----------------------------^^^
Is there a way to get distinct records while reading parquet files? Any read option I can use.
The problem you face is explicitly stated in the exception message - because MapType columns are neither hashable nor orderable cannot be used as a part of grouping or partitioning expression.
Your take on SQL solution is not logically equivalent to distinct on Dataset. If you want to deduplicate data based on a set of compatible columns you should use dropDuplicates:
df.dropDuplicates("timestamp")
which would be equivalent to
SELECT timestamp, first(c1) AS c1, first(c2) AS c2,  ..., first(cn) AS cn,
       first(canvasHashes) AS canvasHashes
FROM df GROUP BY timestamp
Unfortunately if your goal is actual DISTINCT it won't be so easy. On possible solution is to leverage Scala* Map hashing. You could define Scala udf like this:
spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)
and then use it in your Java code to derive column that can be used to dropDuplicates:
 df
  .selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
  .dropDuplicates(
    // All columns excluding canvasHashes / hash_of_canvas_hashes
    "timestamp",  "c1", "c2", ..., "cn" 
    // Hash used as surrogate of canvasHashes
    "hash_of_canvas_hashes"         
  )
with SQL equivalent
SELECT 
  timestamp, c1, c2, ..., cn,   -- All columns excluding canvasHashes
  first(canvasHashes) AS canvasHashes
FROM df GROUP BY
  timestamp, c1, c2, ..., cn    -- All columns excluding canvasHashes
* Please note that java.util.Map with its hashCode won't work, as hashCode is not consistent.
1) If you want to distinct based on coluns you can use it
val df = sc.parallelize(Array((1, 2), (3, 4), (1, 6))).toDF("no", "age")
scala> df.show
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+
val distinctValuesDF = df.select(df("no")).distinct
scala> distinctValuesDF.show
+---+
| no|
+---+
|  1|
|  3|
+---+
2) If you have want unique on all column use dropduplicate
scala> val df = sc.parallelize(Array((1, 2), (3, 4),(3, 4), (1, 6))).toDF("no", "age")
scala> df.show
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  3|  4|
|  1|  6|
+---+---+
scala> df.dropDuplicates().show()
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With