Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does distinct() sort the dataset?

I am writing a preprocessing application that, among other transformations and actions, sorts the dataset in the end before it writes it to HDFS. A new request requires me to de-duplicate the dataset, so I would like to do that in one stage with sorting. My understanding is that in order to do de-duplication effectively, sorting is necessary (maybe I am wrong in this, didn't research it much, just seems natural).

For certain reasons (MapType column in the output schema), I first tested distinct in earlier stage than sort, thinking I would get rid of MapType columns later in order to merge both of them together.

Spark UI output

What happened, is that second stage of sorting was skipped, as if the dataset was already sorted. This makes sense to me, but is not supported anywhere in the docs (AFAIK) and I don't know, whether it is expected behavior that is stable (I don't want to push this to production just to realize that I am suddenly doing 2 expensive stages: sort and distinct both). Anyone has any more insights on how are sort and/or distinct implemented?

like image 322
magnus Avatar asked Dec 17 '22 17:12

magnus


1 Answers

In spark, distinct and in general all the aggregation operations (such as groupBy) do not sort the data. We can check that easily using the explain function.

// Let's generate a df with 5 elements in [0, 4[ to have at least one duplicate
val data = spark.range(5).select(floor(rand() * 4) as "r")

data.distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#105L], functions=[])
+- Exchange hashpartitioning(r#105L, 200)
   +- *HashAggregate(keys=[r#105L], functions=[])
      +- *Project [FLOOR((rand(7842501052366484791) * 5.0)) AS r#105L]
         +- *Range (0, 10, step=1, splits=2)

HashAggregate + Exchange mean that the elements are hashed and shuffled so that elements with the same hash are in the same partition. Then, elements with the same hash are compared and de-duplidated. The data is therefore not sorted after the process. Let's check that:

data.distinct.show()
+---+                                                                           
|  r|
+---+
|  0|
|  3|
|  2|
+---+

Let's address your concern about performance now. If you sort after de-duplicating, here is what happens.

data.distinct.orderBy("r").explain
== Physical Plan ==
*Sort [r#227L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
   +- *HashAggregate(keys=[r#227L], functions=[])
      +- Exchange hashpartitioning(r#227L, 200)
         +- *HashAggregate(keys=[r#227L], functions=[])
            +- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
               +- *Range (0, 5, step=1, splits=2)

We can see that the data is shuffled to be de-duplicated (Exchange hashpartitioning) and shuffled again to be sorted (Exchange rangepartitioning). That's quite expensive. This is due to the fact that sorting needs a shuffle by range so that elements within the same range end up in the same partition which can then be sorted. Yet, we can be smarter and sort before de-duplicating:

data.orderBy("r").distinct.explain
== Physical Plan ==
*HashAggregate(keys=[r#227L], functions=[])
+- *HashAggregate(keys=[r#227L], functions=[])
   +- *Sort [r#227L ASC NULLS FIRST], true, 0
      +- Exchange rangepartitioning(r#227L ASC NULLS FIRST, 200)
         +- *Project [FLOOR((rand(-8636860894475783181) * 4.0)) AS r#227L]
            +- *Range (0, 5, step=1, splits=2)

Only one exchange remains. Indeed, spark knows that after a shuffle by range, duplicated elements are in the same partition. Therefore it does not trigger a new shuffle.

like image 101
Oli Avatar answered Jan 08 '23 05:01

Oli