Here is my code example:
case class Person(name:String,tel:String){
def equals(that:Person):Boolean = that.name == this.name && this.tel == that.tel}
val persons = Array(Person("peter","139"),Person("peter","139"),Person("john","111"))
sc.parallelize(persons).distinct.collect
It returns
res34: Array[Person] = Array(Person(john,111), Person(peter,139), Person(peter,139))
Why distinct doesn't work?I want the result as Person("john",111),Person("peter",139)
distinct is a transformation. This means that it is not executed immediately, but only when an action is called. collect is an action. Calling the collect method causes all previous transformations to be run.
In PySpark, the distinct() function is widely used to drop or remove the duplicate rows or all columns from the DataFrame. The dropDuplicates() function is widely used to drop the rows based on the selected (one or multiple) columns.
In Pyspark, there are two ways to get the count of distinct values. We can use distinct() and count() functions of DataFrame to get the count distinct of PySpark DataFrame. Another way is to use SQL countDistinct() function which will provide the distinct value count of all the selected columns.
PySpark Select Distinct Multiple Columns To select distinct on multiple columns using the dropDuplicates(). This function takes columns where you wanted to select distinct values and returns a new DataFrame with unique values on selected columns.
Extending further from the observation of @aaronman, there is a workaround for this issue.
On the RDD, there're two definitions for distinct
:
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = distinct(partitions.size)
It's apparent from the signature of the first distinct
that there must be an implicit ordering of the elements and it's assumed null if absent, which is what the short version .distinct()
does.
There's no default implicit ordering for case classes, but it's easy to implement one:
case class Person(name:String,tel:String) extends Ordered[Person] {
def compare(that: Person): Int = this.name compare that.name
}
Now, trying the same example delivers the expected results (note that I'm comparing names):
val ps5 = Array(Person("peter","138"),Person("peter","55"),Person("john","138"))
sc.parallelize(ps5).distinct.collect
res: Array[P5] = Array(P5(john,111), P5(peter,139))
Note that case classes already implement equals
and hashCode
, so the impl on the provided example is unnecessary and also incorrect. The correct signature for equals
is: equals(arg0: Any): Boolean
-- BTW, I first thought that the issue had to do with the incorrect equals signature, which sent me looking in the wrong path.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With