I am trying to understand how task serialization works in Spark and am a bit confused by some mixed results I'm getting in a test I've written.
I have some test code (simplified for sake of post) that does the following over more than one node:
object TestJob {
def run(): Unit = {
val rdd = ...
val helperObject = new Helper() // Helper does NOT impl Serializable and is a vanilla class
rdd.map(element => {
helperObject.transform(element)
}).collect()
}
}
When I execute run()
, the job bombs out with a "task not serializable" exception as expected since helperObject
is not serializable. HOWEVER, when I alter it a little, like this:
trait HelperComponent {
val helperObject = new Helper()
}
object TestJob extends HelperComponent {
def run(): Unit = {
val rdd = ...
rdd.map(element => {
helperObject.transform(element)
}).collect()
}
}
The job executes successfully for some reason. Could someone help me to understand why this might be? What exactly gets serialized by Spark and sent to the workers in each case above?
I am using Spark version 2.1.1.
Thank you!
cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.
Serialization is used for performance tuning on Apache Spark. All data that is sent over the network or written to the disk or persisted in the memory should be serialized. Serialization plays an important role in costly operations. PySpark supports custom serializers for performance tuning.
KryoSerializer") . This setting configures the serializer used for not only shuffling data between worker nodes but also when serializing RDDs to disk. The only reason Kryo is not the default is because of the custom registration requirement, but we recommend trying it in any network-intensive application.
Conclusion: RDD actions are operations that return non-RDD values, since RDD's are lazy they do not execute the transformation functions until we call actions. hence, all these functions trigger the transformations to execute and finally returns the value of the action functions to the driver program.
Could someone help me to understand why this might be?
In your first snippet, helperObject
is a local variable declared inside run
. As such, it will be closed over (lifted) by the function such that where ever this code executes, all information would be available, and because of that Sparks ClosureCleaner
yells at you for trying to serialize it.
In your second snippet, the value is no longer a local variable in the method scope, it is part of the class instance (technically, this is an object declaration but it will be transformed into a JVM class after all).
This is meaningful in Spark for the reason that all worker nodes in the cluster contain the JARs needed to execute your code. Thus, instead of serializing TestObject
in its entirety for rdd.map
, when Spark spins up an Executor process in one of your workers, it will load TestObject
locally via a ClassLoader
, and create an instance of it, just like every other JVM class in a non distributed application.
To conclude, the reason you don't see this blowing up is because the class is no longer serialized due to the changes in the way you've declared the type instance.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With