Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to find max value in pair RDD?

I have a spark pair RDD (key, count) as below

Array[(String, Int)] = Array((a,1), (b,2), (c,1), (d,3))

How to find the key with highest count using spark scala API?

EDIT: datatype of pair RDD is org.apache.spark.rdd.RDD[(String, Int)]

like image 973
Vijay Innamuri Avatar asked Nov 12 '14 11:11

Vijay Innamuri


People also ask

How do you find the maximum RDD?

Basically the max function orders by the return value of the lambda function. Here a is a pair RDD with elements such as ('key',int) and x[1] just refers to the integer part of the element. Note that the max function by itself will order by key and return the max value.

How do you calculate RDD?

The answer is that rdd. count() is an "action" — it is an eager operation, because it has to return an actual number. The RDD operations you've performed before count() were "transformations" — they transformed an RDD into another lazily. In effect the transformations were not actually performed, just queued up.

What is the difference between RDD and pair RDD?

Unpaired RDDs consists of any type of objects. However, paired RDDs (key-value) attains few special operations in it. Such as, distributed “shuffle” operations, grouping or aggregating the elements the key.

How many RDDs can Cogroup () can work at once?

cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.


2 Answers

Use Array.maxBy method:

val a = Array(("a",1), ("b",2), ("c",1), ("d",3))
val maxKey = a.maxBy(_._2)
// maxKey: (String, Int) = (d,3)

or RDD.max:

val maxKey2 = rdd.max()(new Ordering[Tuple2[String, Int]]() {
  override def compare(x: (String, Int), y: (String, Int)): Int = 
      Ordering[Int].compare(x._2, y._2)
})
like image 163
Sergii Lagutin Avatar answered Sep 22 '22 15:09

Sergii Lagutin


Use takeOrdered(1)(Ordering[Int].reverse.on(_._2)):

val a = Array(("a",1), ("b",2), ("c",1), ("d",3))
val rdd = sc.parallelize(a)
val maxKey = rdd.takeOrdered(1)(Ordering[Int].reverse.on(_._2))
// maxKey: Array[(String, Int)] = Array((d,3))

Quoting the note from RDD.takeOrdered:

This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory.

like image 29
Jacek Laskowski Avatar answered Sep 22 '22 15:09

Jacek Laskowski