Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to sort an RDD in Scala Spark?

Reading Spark method sortByKey :

sortByKey([ascending], [numTasks])   When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

Is it possible to return just "N" amount of results. So instead of returning all results, just return the top 10. I could convert the sorted collection to an Array and use take method but since this is an O(N) operation is there a more efficient method ?

like image 789
blue-sky Avatar asked May 23 '14 21:05

blue-sky


People also ask

How do I sort in RDD?

Spark RDD sortByKey() Syntax ascending is used to specify the order of the sort, by default, it is true meaning ascending order, use false for descending order. numPartitions is used to specify the number of partitions it should create with the result of the sortByKey() function.

How do I sort by RDD value?

Method 1: Using sortBy() sortBy() is used to sort the data by value efficiently in pyspark. It is a method available in rdd. It uses a lambda expression to sort the data based on columns.

Is sortBy an action in Spark?

As per Spark documentation only RDD actions can trigger a Spark job and the transformations are lazily evaluated when an action is called on it. I see the sortBy transformation function is applied immediately and it is shown as a job trigger in the SparkUI.


2 Answers

If you only need the top 10, use rdd.top(10). It avoids sorting, so it is faster.

rdd.top makes one parallel pass through the data, collecting the top N in each partition in a heap, then merges the heaps. It is an O(rdd.count) operation. Sorting would be O(rdd.count log rdd.count), and incur a lot of data transfer — it does a shuffle, so all of the data would be transmitted over the network.

like image 127
Daniel Darabos Avatar answered Oct 07 '22 08:10

Daniel Darabos


Most likely you have already perused the source code:

  class OrderedRDDFunctions {
   // <snip>
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    val part = new RangePartitioner(numPartitions, self, ascending)
    val shuffled = new ShuffledRDD[K, V, P](self, part)
    shuffled.mapPartitions(iter => {
      val buf = iter.toArray
      if (ascending) {
        buf.sortWith((x, y) => x._1 < y._1).iterator
      } else {
        buf.sortWith((x, y) => x._1 > y._1).iterator
      }
    }, preservesPartitioning = true)
  }

And, as you say, the entire data must go through the shuffle stage - as seen in the snippet.

However, your concern about subsequently invoking take(K) may not be so accurate. This operation does NOT cycle through all N items:

  /**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   */
  def take(num: Int): Array[T] = {

So then, it would seem:

O(myRdd.take(K)) << O(myRdd.sortByKey()) ~= O(myRdd.sortByKey.take(k)) (at least for small K) << O(myRdd.sortByKey().collect()

like image 19
WestCoastProjects Avatar answered Oct 07 '22 07:10

WestCoastProjects