I've been looking at the documentation for Spark and it mentions this:
Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this:
Anonymous function syntax, which can be used for short pieces of code. Static methods in a global singleton object. For example, you can define object MyFunctions and then pass MyFunctions.func1, as follows:
object MyFunctions { def func1(s: String): String = { ... } }
myRdd.map(MyFunctions.func1)
Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. For example, consider:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
Here, if we create a new MyClass and call doStuff on it, the map inside there references the func1 method of that MyClass instance, so the whole object needs to be sent to the cluster. It is similar to writing
rdd.map(x => this.func1(x))
.
Now my doubt is what happens if you have attributes on the singleton object (which are supposed to be equivalent to static). Same example with a small alteration:
object MyClass {
val value = 1
def func1(s: String): String = { s + value }
}
myRdd.map(MyClass.func1)
So the function is still referenced statically, but how far does Spark goes by trying to serialize all referenced variables? Will it serialize value
or will it be initialized again in the remote workers?
Additionally, this is all in the context that I have some heavy models inside a singleton object and I would like to find the correct way to serialize them to workers while keeping the ability to reference them from the singleton everywhere, instead of passing them around as function parameters across a pretty deep function call stack.
Any in-depth information on what/how/when does Spark serialize things would be appreciated.
This is less a question about Spark and more of a question of how Scala generates code. Remember that a Scala object
is pretty much a Java class full of static methods. Consider a simple example like this:
object foo {
val value = 42
def func(i: Int): Int = i + value
def main(args: Array[String]): Unit = {
println(Seq(1, 2, 3).map(func).sum)
}
}
That will be translated to 3 Java classes; one of them will be the closure that is a parameter to the map
method. Using javap
on that class yields something like this:
public final class foo$$anonfun$main$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
public static final long serialVersionUID;
public final int apply(int);
public int apply$mcII$sp(int);
public final java.lang.Object apply(java.lang.Object);
public foo$$anonfun$main$1();
}
Note there are no fields or anything. If you look at the disassembled bytecode, all it does is call the func()
method. When running in Spark, this is the instance that will get serialized; since it has no fields, there's not much to be serialized.
As for your question, how to initialize static objects, you can have an idempotent initialization function that you call at the start of your closures. The first one will trigger initialization, the subsequent calls will be no-ops. Cleanup, though, is a lot trickier, since I'm not familiar with an API that does something like "run this code on all executors".
One approach that can be useful if you need cleanup is explained in this blog, in the "setup() and cleanup()" section.
EDIT: just for clarification, here's the disassembly of the method that actually makes the call.
public int apply$mcII$sp(int);
Code:
0: getstatic #29; //Field foo$.MODULE$:Lfoo$;
3: iload_1
4: invokevirtual #32; //Method foo$.func:(I)I
7: ireturn
See how it just references the static field holding the singleton and calls the func()
method.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With