I was wondering what is the different of using sparks mapPartitions
functionality vs transient lazy val.
Since each partition is basically running on a different node a single instance of transient lazy val will be created on each nod (assuming its in an object).
For example:
class NotSerializable(v: Int) {
def foo(a: Int) = ???
}
object OnePerPartition {
@transient lazy val obj: NotSerializable = new NotSerializable(10)
}
object Test extends App{
val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.parallelize(1 to 100000)
rdd.map(OnePerPartition.obj.foo)
// ---------- VS ----------
rdd.mapPartitions(itr => {
val obj = new NotSerializable(10)
itr.map(obj.foo)
})
}
One might ask why would you even want it...
I would like to create a general container notion for running my logic on any generic collection implementation (RDD
, List
, scalding pipe
, etc.)
All of them have a notion of "map", but mapPartition
is unique for spark
.
First of all you don't need transient
lazy
here. Using object
wrapper is enough to make this work and you can actually write this as:
object OnePerExecutor {
val obj: NotSerializable = new NotSerializable(10)
}
There is a fundamental difference between the object wrapper and initializing NotSerializable
inside mapPartitions
. This:
rdd.mapPartitions(iter => {
val ns = NotSerializable(1)
???
})
creates a single NotSerializable
instance per partition.
Object wrapper from the other hand, creates a single NotSerializable
instance for each executor JVM. As a result this instance:
It means it should be thread safe and any method calls should be side effects free.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With