In Spark how does one know which objects are instantiated on driver and which are instantiated on executor , and hence how does one determine which classes needs to implement Serializable ?
Serialization is used for performance tuning on Apache Spark. All data that is sent over the network or written to the disk or persisted in the memory should be serialized. Serialization plays an important role in costly operations. PySpark supports custom serializers for performance tuning.
Java serialization: By default, Spark serializes objects using Java's ObjectOutputStream framework, and can work with any class you create that implements java. io. Serializable . You can also control the performance of your serialization more closely by extending java.
Spark provides two types of serialization libraries: Java serialization and (default) Kryo serialization.
Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you'll use in the program in advance for best performance. So it is not used by default because: Not every java. io.
To serialize an object means to convert its state to a byte stream so that the byte stream can be reverted back into a copy of the object. A Java object is serializable if its class or any of its superclasses implements either the java.io.Serializable interface or its subinterface, java.io.Externalizable.
A class is never serialized only object of a class is serialized. Object serialization is needed if object needs to be persisted or transmitted over the network .
Class Component Serialization instance variable yes Static instance variable no methods no Static methods no Static inner class no local variables no
Let's take a sample Spark code and go through various scenarios
public class SparkSample { public int instanceVariable =10 ; public static int staticInstanceVariable =20 ; public int run(){ int localVariable =30; // create Spark conf final SparkConf sparkConf = new SparkConf().setAppName(config.get(JOB_NAME).set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); // create spark context final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); // read DATA JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); // Anonymous class used for lambda implementation JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) { // How will the listed varibles be accessed in RDD across driver and Executors System.out.println("Output :" + instanceVariable + " " + staticInstanceVariable + " " + localVariable); return Arrays.asList(SPACE.split(s)).iterator(); }); // SAVE OUTPUT words.saveAsTextFile(OUTPUT_PATH)); } // Inner Static class for the funactional interface which can replace the lambda implementation above public static class MapClass extends FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) { System.out.println("Output :" + instanceVariable + " " + staticInstanceVariable + " " + localVariable); return Arrays.asList(SPACE.split(s)).iterator(); }); public static void main(String[] args) throws Exception { JavaWordCount count = new JavaWordCount(); count.run(); } }
Accessibility and Serializability of instance variable from Outer Class inside inner class objects
Inner class | Instance Variable (Outer class) | Static Instance Variable (Outer class) | Local Variable (Outer class) Anonymous class | Accessible And Serialized | Accessible yet not Serialized | Accessible And Serialized Inner Static class | Not Accessible | Accessible yet not Serialized | Not Accessible
Rule of thumb while understanding Spark job is :
All the lambda functions written inside the RDD are instantiated on the driver and the objects are serialized and sent to the executors
If any outer class instance variables are accessed within the inner class, compiler apply different logic to access them, hence outer class gets serialized or not depends what do you access.
In terms of Java, the whole debate is about Outer class vs Inner class and how does accessing outer class references and variables leads to serialization issues.
Various scenarios:
Compiler by default inserts constructor in the byte code of the
Anonymous class with reference to Outer class object .
The outer class object is used to access the instance variable
Anonymous-class(){
final Outer-class reference; Anonymous-class( Outer-class outer-reference){ reference = outer-reference; }
}
The outer class is serialized and sent along with the serialized object of the inner anonymous class
As static variables are not serialized , outer class object is still inserted into the Anonymous class constructor .
The value of the static variable is taken from the class state
present on that executor .
Compiler by default inserts constructor in the byte code of the
Anonymous class with reference to Outer class object AND local variable refrence.
The outer class object is used to access the instance variable
Anonymous-class(){
final Outer-class reference; final Local-variable localRefrence ; Anonymous-class( Outer-class outer-reference, Local-variable localRefrence){ reference = outer-reference; this.localRefrence = localRefrence; }
}
The outer class is serialized , and the local variable object is also
serialized and sent along with the serialized object of the inner anonymous class
As the local variable becomes a instance member inside the anonymous class it needs to be serialized . From outer class perspective the local variable can never be serialized
cant be accessed
cant be accessed
As static variables are not serialized hence no outer class object is serialized.
The value of the static variable is taken from the class state
present on that executor .
Outer class is not serialized and send along with the serialized Static inner class
Points to think through:
Java Serialization rules are followed to select which class object needs to be serialized .
Use javap -p -c "abc.class" to unwrap the byte code and see the compiler generated code
Depending on what you are trying to access within the inner class of the outer class, compiler generates different byte code.
You don't need to make classes implement Serialization which are only accessed on driver .
Any anonymous/static class (all lambda function are anonymous class) used within RDD will be instantiated on the driver .
Any class/variable used inside RDD will be instantiated on driver and sent to the executors .
Any instance variable declared transient will not be serialized on driver.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With