I get the error message SPARK-5063 in the line of println
val d.foreach{x=> for(i<-0 until x.length)
println(m.lookup(x(i)))}
d is RDD[Array[String]]
m is RDD[(String, String)]
. Is there any way to print as the way I want? or how can i convert d from RDD[Array[String]]
to Array[String]
?
SPARK-5063 relates to better error messages when trying to nest RDD operations, which is not supported.
It's a usability issue, not a functional one. The root cause is the nesting of RDD operations and the solution is to break that up.
Here we are trying a join of dRDD
and mRDD
. If the size of mRDD
is large, a rdd.join
would be the recommended way otherwise, if mRDD
is small, i.e. fits in memory of each executor, we could collect it, broadcast it and do a 'map-side' join.
A simple join would go like this:
val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6))
val flat = rdd.flatMap(_.toSeq).keyBy(x=>x)
val res = flat.join(map).map{case (k,v) => v}
If we would like to use broadcast, we first need to collect the value of the resolution table locally in order to b/c that to all executors. NOTE the RDD to be broadcasted MUST fit in the memory of the driver as well as of each executor.
val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6)))
val bcTable = sc.broadcast(map.collectAsMap)
val res2 = rdd.flatMap{arr => arr.map(elem => (elem, bcTable.value(elem)))}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With