I am calling a function in scala which gives an RDD[(Long,Long,Double)]
as its output.
def helperfunction(): RDD[(Long, Long, Double)]
I call this function in loop in another part of the code and I want to merge all the generated RDDs. The loop calling the function looks something like this
for (i <- 1 to n){
val tOp = helperfunction()
// merge the generated tOp
}
What I want to do is something similar to what StringBuilder would do for you in Java when you wanted to merge the strings. I have looked at techniques of merging RDDs, which mostly point to using union function like this
RDD1.union(RDD2)
But this requires both RDDs to be generated before taking their union. I though of initializing a var RDD1 to accumulate the results outside the for loop but I am not sure how can I initialize a blank RDD of type [(Long,Long,Double)]
. Also I am starting out with spark, so I am not even sure if this is the most elegant method to solve this problem.
A map partitions can then be used to construct multiple RDDs from the partitioned RDD without reading all the data. Just be aware that the number of partitions in the filtered RDDs will be the same as the number in the partitioned RDD so a coalesce should be used to reduce this down and remove the empty partitions.
RDDs are immutable in nature i.e. we cannot change the RDD, we need to transform it by applying transformation(s).
RDDs can be created generally by the parallelizing method. It is possible by taking an existing collection from our driver program.
Instead of using vars, you can use functional programming paradigms to achieve what you want :
val rdd = (1 to n).map(x => helperFunction()).reduce(_ union _)
Also, if you still need to create an empty RDD, you can do it using :
val empty = sc.emptyRDD[(long, long, String)]
You're correct that this might not be the optimal way to do this, but we would need more info on what you're trying to accomplish with generating a new RDD with each call to your helper function.
You could define 1 RDD prior to the loop and assign it a var then run it through your loop. Here's an example:
val rdd = sc.parallelize(1 to 100)
val rdd_tuple = rdd.map(x => (x.toLong, (x*10).toLong, x.toDouble))
var new_rdd = rdd_tuple
println("Initial RDD count: " + new_rdd.count())
for (i <- 2 to 4) {
new_rdd = new_rdd.union(rdd_tuple)
}
println("New count after loop: " + new_rdd.count())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With