Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does pyspark changes order of instructions for optimization?

Let's say I have the following pipeline:

df.orderBy('foo').limit(10).show()

Here we can see that the orderBy instruction comes first, so all rows of the dataframe should be sorted before the limit instruction be executed. I found myself thinking if the Spark does some "reorganization" inside the pipeline in order to improve performace (for example, executing the limit instruction before the orderBy). Does spark do that?

like image 708
flpn Avatar asked Dec 05 '19 12:12

flpn


3 Answers

Your assumption is correct. Spark executes sort and then limit on each partition before merging/collecting the results as we will see next.

An orderBy followed by limit will cause the next calls:

  • [Dataset.scala] Dataset:orderBy()
  • [Dataset.scala] Dataset:sortInternal()
  • [SparkStrategies.scala] SpecialLimits:apply()
  • [limit.scala] TakeOrderedAndProjectExec:doExecute()

By looking into the TakeOrderedAndProjectExec:doExecute() method we will first meet the next code:

protected override def doExecute(): RDD[InternalRow] = {
    val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
    val localTopK: RDD[InternalRow] = {
      child.execute().map(_.copy()).mapPartitions { iter =>
        org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)
      }
    }

......

Here we can see that the localTopK is populated by getting topK first records from each sorted partition. That means that Spark tries to push-down the topK filter as soon as possible at partition level.

The next lines:

....

val shuffled = new ShuffledRowRDD(
      ShuffleExchangeExec.prepareShuffleDependency(
        localTopK,
        child.output,
        SinglePartition,
        serializer,
        writeMetrics),
      readMetrics)
    shuffled.mapPartitions { iter =>
      val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
      if (projectList != child.output) {
        val proj = UnsafeProjection.create(projectList, child.output)
        topK.map(r => proj(r))
      } else {
        topK
      }
    }

Will generate the final ShuffledRowRDD from all partitions which will contain the final topK sorted records composing the final result of limit.

Example

Let's illustrate this through an example. Consider the dataset with the range 1,2,3...20 which is partitioned into two parts. The first one contains the odd numbers when the second one the even numbers as shown next:

-----------   -----------
|   P1    |   |   P2    | 
-----------   -----------
|   1     |   |   2     |
|   3     |   |   4     |
|   5     |   |   6     |
|   7     |   |   8     |
|   9     |   |   10    |
|  ....   |   |  ....   |
|   19    |   |   20    |
-----------   -----------

when df.orderBy(...).limit(5) is executed Spark will get top 5 sorted records from each partition aka 1-9 for the 1st one and 2-10 for the 2nd one. Then it will merge and sort them aka sequence 1,2,3,4,5..10. Finally it will get the top 5 records generating the final list 1,2,3,4,5.

Conclusion

Spark leverages all the available information when it comes to orderBy followed by limit by omitting to process the whole dataset but only the first topK rows. As @ShemTov already mentioned there is no need to call limit before orderBy since 1st that would return an invalid dataset and 2nd because Spark does all the necessary optimisations internally for you.

like image 135
abiratsis Avatar answered Sep 22 '22 06:09

abiratsis


Spark does optimization when need, but in your case it cant do the limit before orderBy because you`ll get uncorrect results.

This code mean i want spark to order all rows on foo column, and then give me the top 10.

like image 35
ShemTov Avatar answered Sep 19 '22 06:09

ShemTov


Simply yes it does, but it doesn't change the result in any case. Thats why we called it optimization.

Spark gives us two operations for performing any problem.

When we do a transformation on any RDD, it gives us a new RDD. But it does not start the execution of those transformations. The execution is performed only when an action is performed on the new RDD and gives us a final result.

So once you perform any action on an RDD, Spark context gives your program to the driver.

The driver creates the DAG (directed acyclic graph) or execution plan (job) for your program. Once the DAG is created, the driver divides this DAG into a number of stages. These stages are then divided into smaller tasks and all the tasks are given to the executors for execution.

The Spark driver is responsible for converting a user program into units of physical execution called tasks. At a high level, all Spark programs follow the same structure. They create RDDs from some input, derive new RDDs from those using transformations, and perform actions to collect or save data. A Spark program implicitly creates a logical directed acyclic graph (DAG) of operations.

When the driver runs, it converts this logical graph into a physical execution plan.

like image 20
Alperen Tahta Avatar answered Sep 19 '22 06:09

Alperen Tahta