Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merge multiple RDD generated in loop

I am calling a function in scala which gives an RDD[(Long,Long,Double)] as its output.

def helperfunction(): RDD[(Long, Long, Double)]

I call this function in loop in another part of the code and I want to merge all the generated RDDs. The loop calling the function looks something like this

for (i <- 1 to n){
    val tOp = helperfunction()
    // merge the generated tOp
}

What I want to do is something similar to what StringBuilder would do for you in Java when you wanted to merge the strings. I have looked at techniques of merging RDDs, which mostly point to using union function like this

RDD1.union(RDD2)

But this requires both RDDs to be generated before taking their union. I though of initializing a var RDD1 to accumulate the results outside the for loop but I am not sure how can I initialize a blank RDD of type [(Long,Long,Double)]. Also I am starting out with spark, so I am not even sure if this is the most elegant method to solve this problem.

like image 304
Shantanu Deshpande Avatar asked Mar 15 '16 01:03

Shantanu Deshpande


People also ask

Can RDD be splitted?

A map partitions can then be used to construct multiple RDDs from the partitioned RDD without reading all the data. Just be aware that the number of partitions in the filtered RDDs will be the same as the number in the partitioned RDD so a coalesce should be used to reduce this down and remove the empty partitions.

Can data in RDD be changed once RDD is created?

RDDs are immutable in nature i.e. we cannot change the RDD, we need to transform it by applying transformation(s).

What is the name of method used to get an RDD from a simple collection?

RDDs can be created generally by the parallelizing method. It is possible by taking an existing collection from our driver program.


2 Answers

Instead of using vars, you can use functional programming paradigms to achieve what you want :

val rdd = (1 to n).map(x => helperFunction()).reduce(_ union _)

Also, if you still need to create an empty RDD, you can do it using :

val empty = sc.emptyRDD[(long, long, String)]
like image 148
Lezzar Walid Avatar answered Sep 23 '22 14:09

Lezzar Walid


You're correct that this might not be the optimal way to do this, but we would need more info on what you're trying to accomplish with generating a new RDD with each call to your helper function.

You could define 1 RDD prior to the loop and assign it a var then run it through your loop. Here's an example:

val rdd = sc.parallelize(1 to 100)
val rdd_tuple = rdd.map(x => (x.toLong, (x*10).toLong, x.toDouble))
var new_rdd = rdd_tuple
println("Initial RDD count: " + new_rdd.count())
for (i <- 2 to 4) {
  new_rdd = new_rdd.union(rdd_tuple)
}
println("New count after loop: " + new_rdd.count())
like image 39
MrChristine Avatar answered Sep 19 '22 14:09

MrChristine