I would like to create an RDD to collect the results of an iterative calculation .
How can I use a loop (or any alternative) to replace the following code:
import org.apache.spark.mllib.random.RandomRDDs._
val n = 10
val step1 = normalRDD(sc, n, seed = 1 )
val step2 = normalRDD(sc, n, seed = (step1.max).toLong )
val result1 = step1.zip(step2)
val step3 = normalRDD(sc, n, seed = (step2.max).toLong )
val result2 = result1.zip(step3)
...
val step50 = normalRDD(sc, n, seed = (step49.max).toLong )
val result49 = result48.zip(step50)
(creating the N step RDDs and zipping then together at the end would also be ok as long the 50 RDDs are created iteratively to respect the seed = (step(n-1).max) condition)
A recursive function would work:
/**
* The return type is an Option to handle the case of a user specifying
* a non positive number of steps.
*/
def createZippedNormal(sc : SparkContext,
numPartitions : Int,
numSteps : Int) : Option[RDD[Double]] = {
@scala.annotation.tailrec
def accum(sc : SparkContext,
numPartitions : Int,
numSteps : Int,
currRDD : RDD[Double],
seed : Long) : RDD[Double] = {
if(numSteps <= 0) currRDD
else {
val newRDD = normalRDD(sc, numPartitions, seed)
accum(sc, numPartitions, numSteps - 1, currRDD.zip(newRDD), newRDD.max)
}
}
if(numSteps <= 0) None
else Some(accum(sc, numPartitions, numSteps, sc.emptyRDD[Double], 1L))
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With