Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Order by value in spark pair RDD

Tags:

I have a spark pair RDD (key, count) as below

Array[(String, Int)] = Array((a,1), (b,2), (c,1), (d,3))

Using spark scala API how to get a new pair RDD which is sorted by value?

Required result: Array((d,3), (b,2), (a,1), (c,1))

like image 304
Vijay Innamuri Avatar asked Nov 17 '14 09:11

Vijay Innamuri


People also ask

How does RDD sort by value in Spark?

Method 1: Using sortBy() sortBy() is used to sort the data by value efficiently in pyspark. It is a method available in rdd. It uses a lambda expression to sort the data based on columns.

Is RDD ordered?

textFile) the lines of the RDD will be in the order that they were in the file. map, filter, flatMap, and coalesce (with shuffle=false) do preserve the order like most of the RDD operations they work on Iterators inside the partitions. So, they just don't have any choice of messing up the order.

How do you sort RDD in descending order?

By default, sortByKey() sorts elements in ascending order, but you can change the sorting order by passing your custom ordering. For example, sortByKey(keyfunc =lambda k: -k) will sort the RDD in descending order.

What is the difference between RDD and pair RDD?

Unpaired RDDs consists of any type of objects. However, paired RDDs (key-value) attains few special operations in it. Such as, distributed “shuffle” operations, grouping or aggregating the elements the key.


2 Answers

This should work:

//Assuming the pair's second type has an Ordering, which is the case for Int
rdd.sortBy(_._2) // same as rdd.sortBy(pair => pair._2)

(Though you might want to take the key to account too when there are ties.)

like image 142
Gábor Bakos Avatar answered Sep 24 '22 02:09

Gábor Bakos


Sort by key and value in ascending and descending order

val textfile = sc.textFile("file:///home/hdfs/input.txt")
val words = textfile.flatMap(line => line.split(" "))
//Sort by value in descending order. For ascending order remove 'false' argument from sortBy
words.map( word => (word,1)).reduceByKey((a,b) => a+b).sortBy(_._2,false)
//for ascending order by value
words.map( word => (word,1)).reduceByKey((a,b) => a+b).sortBy(_._2)

//Sort by key in ascending order
words.map( word => (word,1)).reduceByKey((a,b) => a+b).sortByKey
//Sort by key in descending order
words.map( word => (word,1)).reduceByKey((a,b) => a+b).sortByKey(false)

This can be done in another way by applying sortByKey after swapping the key and value

//Sort By value by swapping key and value and then using sortByKey
val sortbyvalue = words.map( word => (word,1)).reduceByKey((a,b) => a+b)
val descendingSortByvalue = sortbyvalue.map(x => (x._2,x._1)).sortByKey(false)
descendingSortByvalue.toDF.show
descendingSortByvalue.foreach {n => {
val word=  n._1
val count = n._2
println(s"$word:$count")}}
like image 39
Nagaraj Vittal Avatar answered Sep 22 '22 02:09

Nagaraj Vittal