Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to sort RDD

I have scoreTriplets is RDD[ARRAY[String]] which I am sorting by following way.

var ScoreTripletsArray = scoreTriplets.collect()
  if (ScoreTripletsArray.size > 0) {        
    /*Sort the ScoreTripletsArray descending by score field*/        
    scala.util.Sorting.stableSort(ScoreTripletsArray, (e1: Array[String], e2: Array[String]) => e1(3).toInt > e2(3).toInt)
}

But collect() will be heavy If there is elements in lack.

So I need to sort RDD by score without using collect().
scoreTriples is RDD[ARRAY[String]] each line of RDD will store Array of the below variables.
EdgeId sourceID destID score sourceNAme destNAme distance

Please give me any reference or hint.

like image 291
Sandip Armal Patil Avatar asked Nov 18 '15 08:11

Sandip Armal Patil


People also ask

How do I sort in RDD?

Spark RDD sortByKey() Syntaxascending is used to specify the order of the sort, by default, it is true meaning ascending order, use false for descending order. numPartitions is used to specify the number of partitions it should create with the result of the sortByKey() function.

How do I sort by value in PySpark RDD?

Method 1: Using sortBy() sortBy() is used to sort the data by value efficiently in pyspark. It is a method available in rdd. It uses a lambda expression to sort the data based on columns.

How do you sort RDD in PySpark descending?

In order to sort by descending order in Spark DataFrame, we can use desc property of the Column class or desc() sql function.

How do you sort RDD in descending order?

By default, sortByKey() sorts elements in ascending order, but you can change the sorting order by passing your custom ordering. For example, sortByKey(keyfunc =lambda k: -k) will sort the RDD in descending order.


2 Answers

Sorting will be, due to shuffling, an expensive operation even without collecting but you can use sortBy method:

import scala.util.Random

val data = Seq.fill(10)(Array.fill(3)("") :+ Random.nextInt.toString)
val rdd  = sc.parallelize(data)

val sorted = rdd.sortBy(_.apply(3).toInt)
sorted.take(3)
// Array[Array[String]] = Array(
//   Array("", "", "", -1660860558),
//   Array("", "", "", -1643214719),
//   Array("", "", "", -1206834289))

If you're interested only in the top results then top and takeOrdered are usually preferred.

import scala.math.Ordering

rdd.takeOrdered(2)(Ordering.by[Array[String], Int](_.apply(3).toInt))
// Array[Array[String]] = 
//   Array(Array("", "", "", -1660860558), Array("", "", "", -1643214719))

rdd.top(2)(Ordering.by[Array[String], Int](_.apply(3).toInt))
// Array[Array[String]] = 
//   Array(Array("", "", "", 1920955686), Array("", "", "", 1597012602))
like image 63
zero323 Avatar answered Oct 18 '22 04:10

zero323


There is sortBy method in RDD (see doc). You can do something like that

scoreTriplets.sortBy( _(3).toInt )
like image 41
ponkin Avatar answered Oct 18 '22 05:10

ponkin