Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark - how to get top N of rdd as a new rdd (without collecting at the driver)

I am wondering how to filter an RDD that has one of the top N values. Usually I would sort the RDD and take the top N items as an array in the driver to find the Nth value that can be broadcasted to filter the rdd like so:

val topNvalues = sc.broadcast(rdd.map(_.fieldToThreshold).distict.sorted.take(N))
val threshold = topNvalues.last
val rddWithTopNValues = rdd.filter(_.fieldToThreshold >= threshold)

but in this case my N is too large, so how can I do this purely with RDDs like so?:

def getExpensiveItems(itemPrices: RDD[(Int, Float)], count: Int): RDD[(Int, Float)] = {
     val sortedPrices = itemPrices.sortBy(-_._2).map(_._1).distinct

     // How to do this without collecting results to driver??
     val highPrices = itemPrices.getTopNValuesWithoutCollect(count)

     itemPrices.join(highPrices.keyBy(x => x)).map(_._2._1)
}
like image 873
anthonybell Avatar asked Nov 29 '17 18:11

anthonybell


People also ask

What are the RDD operations in spark?

1. Spark RDD Operations. Two types of Apache Spark RDD operations are- Transformations and Actions. A Transformation is a function that produces new RDD from the existing RDDs but when we want to work with the actual dataset, at that point Action is performed. When the action is triggered after the result, new RDD is not formed like transformation.

How to create RDD in Apache Spark?

There are following ways to create RDD in Spark are: 1 Using parallelized collection. 2 From external datasets (Referencing a dataset in external storage system ). 3 From existing apache spark RDD s.

What is the difference between Rdd1 and rdd2?

The key rule of this function is that the two RDDs should be of the same type. For example, the elements of RDD1 are (Spark, Spark, Hadoop, Flink) and that of RDD2 are ( Big data, Spark, Flink) so the resultant rdd1.union (rdd2) will have elements (Spark, Spark, Spark, Hadoop, Flink, Flink, Big data).

What are transformations and actions in spark RDDs?

Spark’s RDDs support two types of operations, namely transformations and actions. Once the RDDs are created we can perform transformations and actions on them. Transformations are operations on the RDDs that create a new RDD by making changes to the original RDD.


1 Answers

Use zipWithIndex on the sorted rdd and then filter by the index up to n items. To illustrate the case consider this rrd sorted in descending order,

val rdd = sc.parallelize((1 to 10).map( _ => math.random)).sortBy(-_)

Then

rdd.zipWithIndex.filter(_._2 < 4)

delivers the first top four items without collecting the rdd to the driver.

like image 129
elm Avatar answered Jan 04 '23 15:01

elm