Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark serialization error mystery

Let's say i have the following code:

class Context {
  def compute() = Array(1.0)
}
val ctx = new Context
val data = ctx.compute

Now we are running this code in Spark:

val rdd = sc.parallelize(List(1,2,3))
rdd.map(_ + data(0)).count()

The code above throws org.apache.spark.SparkException: Task not serializable. I'm not asking how to fix it, by extending Serializable or making a case class, i want to understand why the error happens.

The thing that i don't understand is why it complains about Context class not being a Serializable, though it's not a part of the lambda: rdd.map(_ + data(0)). data here is an Array of values which should be serialized, but it seems that JVM also captures ctx reference as well, which, in my understanding, should not happening.

As i understand, in the shell Spark should clear lambda from the repl context. If we print the tree after delambdafy phase, we would see these pieces:

object iw extends Object {
  ... 
  private[this] val ctx: $line11.iw$Context = _;
  <stable> <accessor> def ctx(): $line11.iw$Context = iw.this.ctx;
  private[this] val data: Array[Double] = _;
  <stable> <accessor> def data(): Array[Double] = iw.this.data; 
  ...
}

class anonfun$1 ... {
  final def apply(x$1: Int): Double = anonfun$1.this.apply$mcDI$sp(x$1);
  <specialized> def apply$mcDI$sp(x$1: Int): Double = x$1.+(iw.this.data().apply(0));
  ...
}

So the decompiled lambda code that is sent to the worker node is: x$1.+(iw.this.data().apply(0)). Part iw.this belongs to the Spark-Shell session, so, as i understand, it should be cleared by the ClosureCleaner, since has nothing to do with the logic and shouldn't be serialized. Anyway, calling iw.this.data() returns an Array[Double] value of the data variable, which is initialized in the constructor:

def <init>(): type = {
  iw.super.<init>();
  iw.this.ctx = new $line11.iw$Context();
  iw.this.data = iw.this.ctx().compute(); // <== here
  iw.this.res4 = ...
  ()
}

In my understanding ctx value has nothing to do with the lambda, it's not a closure, hence shouldn't be serialized. What am i missing or misunderstanding?

like image 351
4lex1v Avatar asked Nov 14 '15 14:11

4lex1v


1 Answers

This has to do with what Spark considers it can use as a closure safely. This is in some cases very intuitive, since Spark uses reflection and in many cases can't recognize some of Scala's guarantees (not a full compiler or anything) or the fact that some variables in the same object are irrelevant. For safety, Spark will attempt to serialize any objects referenced, which in your case includes iw, which is not serializable.

The code inside ClosureCleaner has a good example:

For instance, transitive cleaning is necessary in the following scenario:

class SomethingNotSerializable {
  def someValue = 1
  def scope(name: String)(body: => Unit) = body
  def someMethod(): Unit = scope("one") {
    def x = someValue
    def y = 2
      scope("two") { println(y + 1) }
  }
}

In this example, scope "two" is not serializable because it references scope "one", which references SomethingNotSerializable. Note that, however, the body of scope "two" does not actually depend on SomethingNotSerializable. This means we can safely null out the parent pointer of a cloned scope "one" and set it the parent of scope "two", such that scope "two" no longer references SomethingNotSerializable transitively.

Probably the easiest fix is to create a local variable, in the same scope, that extracts the value from your object, such that there is no longer any reference to the encapsulating object inside the lambda:

val rdd = sc.parallelize(List(1,2,3))
val data0 = data(0)
rdd.map(_ + data0).count()
like image 152
Daniel Langdon Avatar answered Nov 10 '22 22:11

Daniel Langdon