Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark: distinct doesnt work?

Here is my code example:

 case class Person(name:String,tel:String){
        def equals(that:Person):Boolean = that.name == this.name && this.tel == that.tel}

 val persons = Array(Person("peter","139"),Person("peter","139"),Person("john","111"))
 sc.parallelize(persons).distinct.collect

It returns

 res34: Array[Person] = Array(Person(john,111), Person(peter,139), Person(peter,139))

Why distinct doesn't work?I want the result as Person("john",111),Person("peter",139)

like image 762
edwardsbean Avatar asked Jul 22 '14 11:07

edwardsbean


People also ask

Is distinct an action in spark?

distinct is a transformation. This means that it is not executed immediately, but only when an action is called. collect is an action. Calling the collect method causes all previous transformations to be run.

How does PySpark distinct work?

In PySpark, the distinct() function is widely used to drop or remove the duplicate rows or all columns from the DataFrame. The dropDuplicates() function is widely used to drop the rows based on the selected (one or multiple) columns.

How do I count distinct values in spark DataFrame?

In Pyspark, there are two ways to get the count of distinct values. We can use distinct() and count() functions of DataFrame to get the count distinct of PySpark DataFrame. Another way is to use SQL countDistinct() function which will provide the distinct value count of all the selected columns.

How do you filter distinct values in PySpark DataFrame?

PySpark Select Distinct Multiple Columns To select distinct on multiple columns using the dropDuplicates(). This function takes columns where you wanted to select distinct values and returns a new DataFrame with unique values on selected columns.


1 Answers

Extending further from the observation of @aaronman, there is a workaround for this issue. On the RDD, there're two definitions for distinct:

 /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = distinct(partitions.size)

It's apparent from the signature of the first distinct that there must be an implicit ordering of the elements and it's assumed null if absent, which is what the short version .distinct() does.

There's no default implicit ordering for case classes, but it's easy to implement one:

case class Person(name:String,tel:String) extends Ordered[Person] {
  def compare(that: Person): Int = this.name compare that.name
}

Now, trying the same example delivers the expected results (note that I'm comparing names):

val ps5 = Array(Person("peter","138"),Person("peter","55"),Person("john","138"))
sc.parallelize(ps5).distinct.collect

res: Array[P5] = Array(P5(john,111), P5(peter,139))

Note that case classes already implement equals and hashCode, so the impl on the provided example is unnecessary and also incorrect. The correct signature for equals is: equals(arg0: Any): Boolean -- BTW, I first thought that the issue had to do with the incorrect equals signature, which sent me looking in the wrong path.

like image 193
maasg Avatar answered Oct 13 '22 09:10

maasg