Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What row is used in dropDuplicates operator?

Which row will be kept when one uses the dropDuplicatesfunction in Spark DF? It is not stated in the Spark documentation.

  1. Keep First (according to row order)
  2. Keep Last (according to row order)
  3. Random?

P.S. assuming in a distributed YARN environment (not master local)

like image 551
Qmage Avatar asked Jun 23 '17 04:06

Qmage


People also ask

How does dropDuplicates work in spark?

dropduplicates(): Pyspark dataframe provides dropduplicates() function that is used to drop duplicate occurrences of data inside a dataframe. The function takes Column names as parameters concerning which the duplicate values have to be removed.

How do you use dropDuplicates?

Return a new DataFrame with duplicate rows removed, optionally only considering certain columns. For a static batch DataFrame , it just drops duplicate rows. For a streaming DataFrame , it will keep all data across triggers as intermediate state to drop duplicates rows.

What is the difference between dropDuplicates and distinct?

Both can be used to eliminate duplicated rows of a Spark DataFrame however, their difference is that distinct() takes no arguments at all, while dropDuplicates() can be given a subset of columns to consider when dropping duplicated records.

How do I remove duplicates in RDD?

Use distinct() – Remove Duplicate Rows on DataFrame distinct() function on DataFrame returns a new DataFrame after removing the duplicate records. This example yields the below output. Alternatively, you can also run dropDuplicates() function which return a new DataFrame with duplicate rows removed.


1 Answers

TL;DR Keep First (according to row order)

dropDuplicates operator in Spark SQL creates a logical plan with Deduplicate operator.

That Deduplicate operator is translated to First logical operator by Spark SQL's Catalyst Optimizer which answers your question nicely (!)

You can see the Deduplicate operator in the logical plan below.

// create datasets with duplicates
val dups = spark.range(9).map(_ % 3)

val q = dups.dropDuplicates

The following is the logical plan of q dataset.

scala> println(q.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#64L], false
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02    +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03       +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04          +- Range (0, 9, step=1, splits=Some(8))

Deduplicate operator is then translated to First logical operator (that shows itself as Aggregate operator after optimizations).

scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Aggregate [value#64L], [value#64L]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02    +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03       +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
04          +- Range (0, 9, step=1, splits=Some(8))

After spending some time reviewing the code of Apache Spark, dropDuplicates operator is equivalent to groupBy followed by first function.

first(columnName: String, ignoreNulls: Boolean): Column Aggregate function: returns the first value of a column in a group.

import org.apache.spark.sql.functions.first
val firsts = dups.groupBy("value").agg(first("value") as "value")
scala> println(firsts.queryExecution.logical.numberedTreeString)
00 'Aggregate [value#64L], [value#64L, first('value, false) AS value#139]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02    +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03       +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04          +- Range (0, 9, step=1, splits=Some(8))

scala> firsts.explain
== Physical Plan ==
*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)])
+- Exchange hashpartitioning(value#64L, 200)
   +- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)])
      +- *SerializeFromObject [input[0, bigint, false] AS value#64L]
         +- *MapElements <function1>, obj#63: bigint
            +- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
               +- *Range (0, 9, step=1, splits=8)

I also think that dropDuplicates operator may be more performant.

like image 174
Jacek Laskowski Avatar answered Sep 21 '22 02:09

Jacek Laskowski