I am wondering how to filter an RDD that has one of the top N values. Usually I would sort the RDD and take the top
N items as an array in the driver to find the Nth value that can be broadcasted to filter the rdd like so:
val topNvalues = sc.broadcast(rdd.map(_.fieldToThreshold).distict.sorted.take(N))
val threshold = topNvalues.last
val rddWithTopNValues = rdd.filter(_.fieldToThreshold >= threshold)
but in this case my N is too large, so how can I do this purely with RDDs like so?:
def getExpensiveItems(itemPrices: RDD[(Int, Float)], count: Int): RDD[(Int, Float)] = {
val sortedPrices = itemPrices.sortBy(-_._2).map(_._1).distinct
// How to do this without collecting results to driver??
val highPrices = itemPrices.getTopNValuesWithoutCollect(count)
itemPrices.join(highPrices.keyBy(x => x)).map(_._2._1)
}
1. Spark RDD Operations. Two types of Apache Spark RDD operations are- Transformations and Actions. A Transformation is a function that produces new RDD from the existing RDDs but when we want to work with the actual dataset, at that point Action is performed. When the action is triggered after the result, new RDD is not formed like transformation.
There are following ways to create RDD in Spark are: 1 Using parallelized collection. 2 From external datasets (Referencing a dataset in external storage system ). 3 From existing apache spark RDD s.
The key rule of this function is that the two RDDs should be of the same type. For example, the elements of RDD1 are (Spark, Spark, Hadoop, Flink) and that of RDD2 are ( Big data, Spark, Flink) so the resultant rdd1.union (rdd2) will have elements (Spark, Spark, Spark, Hadoop, Flink, Flink, Big data).
Spark’s RDDs support two types of operations, namely transformations and actions. Once the RDDs are created we can perform transformations and actions on them. Transformations are operations on the RDDs that create a new RDD by making changes to the original RDD.
Use zipWithIndex
on the sorted rdd and then filter by the index up to n items. To illustrate the case consider this rrd sorted in descending order,
val rdd = sc.parallelize((1 to 10).map( _ => math.random)).sortBy(-_)
Then
rdd.zipWithIndex.filter(_._2 < 4)
delivers the first top four items without collecting the rdd to the driver.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With