Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to apply partial sort on a Spark DataFrame?

The following code:

val myDF = Seq(83, 90, 40, 94, 12, 70, 56, 70, 28, 91).toDF("number")
myDF.orderBy("number").limit(3).show

outputs:

+------+
|number|
+------+
|    12|
|    28|
|    40|
+------+

Does Spark's laziness in combination with the limit call and the implementation of orderBy automatically result in a partially sorted DataFrame, or are the remaining 7 numbers also sorted, even though it's not needed? And if so, is there a way to avoid this needless computational work?


Using .explain() shows, that two sorts stages are performed, first on each partition and then (with the top 3 each) a global one. But it does not state if these sorts are full or partial.

myDF.orderBy("number").limit(3).explain(true)
== Parsed Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- Sort [number#3416 ASC NULLS FIRST], true
      +- Project [value#3414 AS number#3416]
         +- LocalRelation [value#3414]

== Analyzed Logical Plan ==
number: int
GlobalLimit 3
+- LocalLimit 3
   +- Sort [number#3416 ASC NULLS FIRST], true
      +- Project [value#3414 AS number#3416]
         +- LocalRelation [value#3414]

== Optimized Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- Sort [number#3416 ASC NULLS FIRST], true
      +- LocalRelation [number#3416]

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[number#3416 ASC NULLS FIRST], output=[number#3416])
+- LocalTableScan [number#3416]
like image 309
Tobias Hermann Avatar asked Oct 27 '25 06:10

Tobias Hermann


1 Answers

If you explain() your dataframe, you'll find that Spark will first do a "local" sort within each partition, and then pick only top three elements from each for a final global sort before taking the top three out of it.

scala> myDF.orderBy("number").limit(3).explain(true)
== Parsed Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- Sort [number#3 ASC NULLS FIRST], true
      +- Project [value#1 AS number#3]
         +- LocalRelation [value#1]

== Analyzed Logical Plan ==
number: int
GlobalLimit 3
+- LocalLimit 3
   +- Sort [number#3 ASC NULLS FIRST], true
      +- Project [value#1 AS number#3]
         +- LocalRelation [value#1]

== Optimized Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- Sort [number#3 ASC NULLS FIRST], true
      +- LocalRelation [number#3]

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[number#3 ASC NULLS FIRST], output=[number#3])
+- LocalTableScan [number#3]

I think its best seen in the Optimized Logical Plan section, but physical says the same thing.

like image 160
mazaneicha Avatar answered Oct 29 '25 09:10

mazaneicha



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!