Let's say i have the following code:
class Context {
def compute() = Array(1.0)
}
val ctx = new Context
val data = ctx.compute
Now we are running this code in Spark:
val rdd = sc.parallelize(List(1,2,3))
rdd.map(_ + data(0)).count()
The code above throws org.apache.spark.SparkException: Task not serializable
. I'm not asking how to fix it, by extending Serializable or making a case class, i want to understand why the error happens.
The thing that i don't understand is why it complains about Context class not being a Serializable
, though it's not a part of the lambda: rdd.map(_ + data(0))
. data
here is an Array of values which should be serialized, but it seems that JVM also captures ctx
reference as well, which, in my understanding, should not happening.
As i understand, in the shell Spark should clear lambda from the repl context. If we print the tree after delambdafy
phase, we would see these pieces:
object iw extends Object {
...
private[this] val ctx: $line11.iw$Context = _;
<stable> <accessor> def ctx(): $line11.iw$Context = iw.this.ctx;
private[this] val data: Array[Double] = _;
<stable> <accessor> def data(): Array[Double] = iw.this.data;
...
}
class anonfun$1 ... {
final def apply(x$1: Int): Double = anonfun$1.this.apply$mcDI$sp(x$1);
<specialized> def apply$mcDI$sp(x$1: Int): Double = x$1.+(iw.this.data().apply(0));
...
}
So the decompiled lambda code that is sent to the worker node is: x$1.+(iw.this.data().apply(0))
. Part iw.this
belongs to the Spark-Shell session, so, as i understand, it should be cleared by the ClosureCleaner, since has nothing to do with the logic and shouldn't be serialized. Anyway, calling iw.this.data()
returns an Array[Double]
value of the data
variable, which is initialized in the constructor:
def <init>(): type = {
iw.super.<init>();
iw.this.ctx = new $line11.iw$Context();
iw.this.data = iw.this.ctx().compute(); // <== here
iw.this.res4 = ...
()
}
In my understanding ctx
value has nothing to do with the lambda, it's not a closure, hence shouldn't be serialized. What am i missing or misunderstanding?
This has to do with what Spark considers it can use as a closure safely. This is in some cases very intuitive, since Spark uses reflection and in many cases can't recognize some of Scala's guarantees (not a full compiler or anything) or the fact that some variables in the same object are irrelevant. For safety, Spark will attempt to serialize any objects referenced, which in your case includes iw
, which is not serializable.
The code inside ClosureCleaner has a good example:
For instance, transitive cleaning is necessary in the following scenario:
class SomethingNotSerializable { def someValue = 1 def scope(name: String)(body: => Unit) = body def someMethod(): Unit = scope("one") { def x = someValue def y = 2 scope("two") { println(y + 1) } } }
In this example, scope "two" is not serializable because it references scope "one", which references SomethingNotSerializable. Note that, however, the body of scope "two" does not actually depend on SomethingNotSerializable. This means we can safely null out the parent pointer of a cloned scope "one" and set it the parent of scope "two", such that scope "two" no longer references SomethingNotSerializable transitively.
Probably the easiest fix is to create a local variable, in the same scope, that extracts the value from your object, such that there is no longer any reference to the encapsulating object inside the lambda:
val rdd = sc.parallelize(List(1,2,3))
val data0 = data(0)
rdd.map(_ + data0).count()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With