Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I select a range of elements in Spark RDD?

Tags:

I'd like to select a range of elements in a Spark RDD. For example, I have an RDD with a hundred elements, and I need to select elements from 60 to 80. How do I do that?

I see that RDD has a take(i: int) method, which returns the first i elements. But there is no corresponding method to take the last i elements, or i elements from the middle starting at a certain index.

like image 416
PlinyTheElder Avatar asked Jul 10 '14 12:07

PlinyTheElder


People also ask

Which function returns a list that contains all the elements in RDD?

Action count() returns the number of elements in RDD. For example, RDD has values {1, 2, 2, 3, 4, 5, 5, 6} in this RDD “rdd. count()” will give the result 8.

What does take () do in Spark?

Take the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.


1 Answers

I don't think there is an efficient method to do this yet. But the easy way is using filter(), lets say you have an RDD, pairs with key value pairs and you only want elements from 60 to 80 inclusive just do.

val 60to80 = pairs.filter {     _ match {         case (k,v) => k >= 60 && k <= 80         case _ => false //incase of invalid input     } } 

I think it's possible that this could be done more efficiently in the future, by using sortByKey and saving information about the range of values mapped to each partition. Keep in mind this approach would only save anything if you were planning to query the range multiple times because the sort is obviously expensive.

From looking at the spark source it would definitely be possible to do efficient range queries using RangePartitioner:

// An array of upper bounds for the first (partitions - 1) partitions   private val rangeBounds: Array[K] = { 

This is a private member of RangePartitioner with the knowledge of all the upper bounds of the partitions, it would be easy to only query the necessary partitions. It looks like this is something spark users may see in the future: SPARK-911

UPDATE: Way better answer, based on pull request I'm writing for SPARK-911. It will run efficiently if the RDD is sorted and you query it multiple times.

val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache() val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]]; val (lower, upper) = (10, 20) val range = p.getPartition(lower) to p.getPartition(upper) println(range) val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) => {   if (range.contains(i))     for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v)   else     Iterator.empty } for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v") 

If having the whole partition in memory is acceptable you could even do something like this.
val glommedAndCached = sorted.glom()cache(); glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()

search is not a member BTW I just made an implicit class that has a binary search function, not shown here

like image 200
aaronman Avatar answered Oct 13 '22 16:10

aaronman