Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Spark achieve sort order?

Tags:

Assume I have a list of Strings. I filter & sort them, and collect the result to driver. However, things are distributed, and each RDD has it's own part of original list. So, how does Spark achieve the final sorted order, does it merge results?

like image 595
dveim Avatar asked Oct 01 '15 12:10

dveim


People also ask

How does PySpark sort data?

You can use either sort() or orderBy() function of PySpark DataFrame to sort DataFrame by ascending or descending order based on single or multiple columns, you can also do sorting using PySpark SQL sorting functions, In this article, I will explain all these different ways using PySpark examples.

How does Spark sort RDD?

Reading Spark method sortByKey : sortByKey([ascending], [numTasks]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

What is the difference between sort and orderBy in Spark?

sort() is more efficient compared to orderBy() because the data is sorted on each partition individually and this is why the order in the output data is not guaranteed. On the other hand, orderBy() collects all the data into a single executor and then sorts them.

What algorithm does Spark use?

TimSort: In Apache Spark 1.1, we switched our default sorting algorithm from quicksort to TimSort, a derivation of merge sort and insertion sort. It performs better than quicksort in most real-world datasets, especially for datasets that are partially ordered. We use TimSort in both the map and reduce phases.


1 Answers

Sorting in Spark is a multiphase process which requires shuffling:

  1. input RDD is sampled and this sample is used to compute boundaries for each output partition (sample followed by collect)
  2. input RDD is partitioned using rangePartitioner with boundaries computed in the first step (partitionBy)
  3. each partition from the second step is sorted locally (mapPartitions)

When the data is collected, all that is left is to follow the order defined by the partitioner.

Above steps are clearly reflected in a debug string:

scala> val rdd = sc.parallelize(Seq(4, 2, 5, 3, 1)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at ...  scala> rdd.sortBy(identity).toDebugString res1: String =  (6) MapPartitionsRDD[10] at sortBy at <console>:24 [] // Sort partitions  |  ShuffledRDD[9] at sortBy at <console>:24 [] // Shuffle  +-(8) MapPartitionsRDD[6] at sortBy at <console>:24 [] // Pre-shuffle steps     |  ParallelCollectionRDD[0] at parallelize at <console>:21 [] // Parallelize 
like image 58
zero323 Avatar answered Sep 21 '22 15:09

zero323