I was wondering what is the different of using sparks mapPartitions functionality vs transient lazy val.
Since each partition is basically running on a different node a single instance of transient lazy val will be created on each nod (assuming its in an object).
For example:
class NotSerializable(v: Int) {
def foo(a: Int) = ???
}
object OnePerPartition {
@transient lazy val obj: NotSerializable = new NotSerializable(10)
}
object Test extends App{
val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.parallelize(1 to 100000)
rdd.map(OnePerPartition.obj.foo)
// ---------- VS ----------
rdd.mapPartitions(itr => {
val obj = new NotSerializable(10)
itr.map(obj.foo)
})
}
One might ask why would you even want it...
I would like to create a general container notion for running my logic on any generic collection implementation (RDD, List, scalding pipe, etc.)
All of them have a notion of "map", but mapPartition is unique for spark.
First of all you don't need transient lazy here. Using object wrapper is enough to make this work and you can actually write this as:
object OnePerExecutor {
val obj: NotSerializable = new NotSerializable(10)
}
There is a fundamental difference between the object wrapper and initializing NotSerializable inside mapPartitions. This:
rdd.mapPartitions(iter => {
val ns = NotSerializable(1)
???
})
creates a single NotSerializable instance per partition.
Object wrapper from the other hand, creates a single NotSerializable instance for each executor JVM. As a result this instance:
It means it should be thread safe and any method calls should be side effects free.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With