When I write an RDD transformation, e.g.
val rdd = sc.parallelise(1 to 1000)
rdd.map(x => x * 3)
I understand that the closure (x => x * 3
) which is simply a Function1 needs to be Serializable and I think I read somewhereEDIT: it's right there implied in the documentation: http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark that it is "sent" to the workers for execution. (e.g. Akka sending an "executable piece of code" down the wire to workers to run)
Is that how it works?
Someone at a meetup I attended commented and said that it is not actually sending any serialized code, but since each worker get a "copy" of the jar anyway, it just needs a reference to which function to run or something like this (but I'm not sure I quote that person correctly)
I'm now at an utter confusion on how it actually works.
So my questions are
how are transformation closures sent to workers? Serialized via akka? or they are "already there" because spark sends the entire uber jar to each worker (sounds unlikely to me...)
if so, then how the rest of the jar is sent to the workers? is this is what the "cleanupClosure" doing? e.g. sending only the relevant bytecode to the worker instead of the entire uberjar? (e.g. only dependent code to the closure?)
so to summarise, does spark, at any point, syncs the jars in the --jars
classpath with the workers somehow? or does it sends "just the right amount" of code to workers? and if it does send closures, are they being cached for the need of recalculation? or does it send the closure with the task every time a task is scheduled? sorry if this is silly questions but I really don't know.
Please add sources if you can for your answer, I couldn't find it explicit in the documentation, and I'm too wary to try and conclude it just by reading the code.
Delta Lake with Apache Spark using Scala A closure is a function, whose return value depends on the value of one or more variables declared outside this function. The following piece of code with anonymous function. val multiplier = (i:Int) => i * 10.
RDD was the primary user-facing API in Spark since its inception. At the core, an RDD is an immutable distributed collection of elements of your data, partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.
action count() in Spark RDD It returns a number of elements or items in RDD. So it basically counts the number of items present in the dataset and returns a number after the count.
The closures are most certainly serialized at runtime. I have plenty of instances seen Closure Not Serializable exceptions at runtime - from pyspark and from scala. There is complex code called
From ClosureCleaner.scala
def clean(
closure: AnyRef,
checkSerializable: Boolean = true,
cleanTransitively: Boolean = true): Unit = {
clean(closure, checkSerializable, cleanTransitively, Map.empty)
}
that attempts to minify the code being serialized. The code is then sent across the wire - if it were serializable. Otherwise an exception will be thrown.
Here is another excerpt from ClosureCleaner to check the ability to serialize an incoming function:
private def ensureSerializable(func: AnyRef) {
try {
if (SparkEnv.get != null) {
SparkEnv.get.closureSerializer.newInstance().serialize(func)
}
} catch {
case ex: Exception => throw new SparkException("Task not serializable", ex)
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With