Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How spark handles object

To test the Serialization exception in spark I wrote a task in 2 ways.
First way:

package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object dd {
  def main(args: Array[String]):Unit = {
    val sparkConf = new SparkConf
    val sc = new SparkContext(sparkConf)

    val data = List(1,2,3,4,5)
    val rdd = sc.makeRDD(data)
    val result = rdd.map(elem => {
      funcs.func_1(elem)
    })        
    println(result.count())
  }
}

object funcs{
  def func_1(i:Int): Int = {
    i + 1
  }
}

This way spark works pretty good.
While when I change it to following way, it does not work and throws NotSerializableException.
Second way:

package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object dd {
  def main(args: Array[String]):Unit = {
    val sparkConf = new SparkConf
    val sc = new SparkContext(sparkConf)

    val data = List(1,2,3,4,5)
    val rdd = sc.makeRDD(data)

    val handler = funcs
    val result = rdd.map(elem => {
      handler.func_1(elem)
    })

    println(result.count())

  }
}

object funcs{
  def func_1(i:Int): Int = {
    i + 1
  }
}

I know the reason I got error "task is not serializable" is because I am trying to send an unserializable object funcs from driver node to worker node in second example. For second example, if I make object funcs extend Serializable, this error will gone.

But In my view, because funcs is an object rather than a class, it is a singleton and supposed to be serialized and shipped from driver to workers instead of instantiating within a worker node itself. In this scenario, although way to use object funcs is different, I guess the unserializable object funcs is shipped from driver node to worker node in both of these 2 examples.

My question is why the first example can be run successfully but second one fails with 'task unserializable' exception.

like image 450
Frankie Avatar asked Nov 14 '16 20:11

Frankie


People also ask

How does Spark execute a job?

Spark execution is agnostic to the cluster manager. You can plug in any of the three available cluster managers or supported cluster managers and the execution behaviour don't change that it doesn't change based on which cluster manager you are using. Spark lies on the cluster manager to launch the executors.

How does Spark process data?

Spark executes much faster by caching data in memory across multiple parallel operations, whereas MapReduce involves more reading and writing from disk. Spark runs multi-threaded tasks inside of JVM processes, whereas MapReduce runs as heavier weight JVM processes.

How much data can Spark handle?

How large a cluster can Spark scale to? Many organizations run Spark on clusters of thousands of nodes. The largest cluster we know has 8000 of them. In terms of data size, Spark has been shown to work well up to petabytes.

How does Apache spark work internally?

Spark translates the RDD transformations into something called DAG (Directed Acyclic Graph) and starts the execution, At high level, when any action is called on the RDD, Spark creates the DAG and submits to the DAG scheduler. The DAG scheduler divides operators into stages of tasks.


2 Answers

When you run code in an RDD closure (map, filter, etc...), everything necessary to execute that code will be packaged up, serialized, and sent to the executors to be run. Any objects that are referenced (or whose fields are referenced) will be serialized in this task, and this is where you'll sometimes get a NotSerializableException.

Your use case is a little more complicated, though, and involves the scala compiler. Typically, calling a function on a scala object is the equivalent of calling a java static method. That object never really exists -- it's basically like writing the code inline. However, if you assign an object to a variable, then you're actually creating a reference to that object in memory, and the object behaves more like a class, and can have serialization issues.

scala> object A { 
  def foo() { 
    println("bar baz")
  }
}
defined module A

scala> A.foo()  // static method
bar baz

scala> val a = A  // now we're actually assigning a memory location
a: A.type = A$@7e0babb1

scala> a.foo()  // dereferences a before calling foo
bar baz
like image 51
Tim Avatar answered Oct 27 '22 01:10

Tim


In order for Spark to distribute a given operation, the function used in the operation needs to be serialized. Before serialization, these functions pass through a complex process appropriately called "ClosureCleaner".

The intention is to "cut off" closures from their context in order to reduce the size of the object graph needed to be serialized and reduce the risk of serialization issues in the process. In other words, ensure that only the code needed to execute the function is serialized and sent for deserialization and execution "at the other side"

During that process, the closure is also evaluated to be Serializable to be proactive about detecting serialization issues at runtime (SparkContext#clean).

That code is dense and complex so it's hard to find the right code path leading to this case.

Intuitively, what's happening is that when the ClosureCleaner finds:

val result = rdd.map{elem => 
  funcs.func_1(elem)
} 

It evaluates the inner members of the closure to be from an object that can be recreated and there are no further references, so the cleaned closure only contains {elem => funcs.func_1(elem)} which can be serialized by the JavaSerializer.

Instead, when the closure cleaner evaluates:

val handler = funcs
val result = rdd.map(elem => {
  handler.func_1(elem)
})

It finds that the closure has a reference to $outer (handler), hence it inspects the outer scope and adds the and variable instance to the cleaned closure. We could imagine the resulting cleaned closure to be something of this shape (this is for illustrative purposes only):

{elem => 
  val handler = funcs
  handler.func_1(elem)
} 

When the closure is tested for serialization, it fails to serialize. Per JVM serialization rules, an object is serializable if recursively all its members are serializable. In this case handler references a non-serializable object and the check fails.

like image 39
maasg Avatar answered Oct 26 '22 23:10

maasg