Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Understanding Spark's closures and their serialization

Disclaimer: just starting to play with Spark.

I'm having troubles understanding the famous "Task not serializable" exception but my question is a little different from those I see on SO (or so I think).

I have a tiny custom RDD (TestRDD). It has a field which stores objects whose class does not implement Serializable (NonSerializable). I've set the "spark.serializer" config option to use Kryo. However, when I try count() on my RDD, I get the following:

Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: com.test.spark.NonSerializable@2901e052)
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)

When I look inside DAGScheduler.submitMissingTasks I see that it uses its closure serializer on my RDD, which is the Java serializer, not the Kryo serializer which I'd expect. I've read that Kryo has issues serializing closures and Spark always uses the Java serializer for closures but I don't quite understand how closures come into play here at all. All I'm doing here is this:

SparkConf conf = new SparkConf()
                         .setAppName("ScanTest")
                         .setMaster("local")
                         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

JavaSparkContext sc = new JavaSparkContext(conf);

TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());

That is, no mappers or anything which would require serialization of closures. OTOH this works:

sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()

The Kryo serializer is used as expected, the closure serializer is not involved. If I didn't set the serializer property to Kryo, I'd get an exception here as well.

I appreciate any pointers explaining where the closure comes from and how to ensure that I can use Kryo to serialize custom RDDs.

UPDATE: here's TestRDD with its non-serializable field mNS:

class TestRDD extends RDD<String> {

    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);

    NonSerializable mNS = new NonSerializable();

    public TestRDD(final SparkContext _sc) {
        super(_sc,
              JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
              STRING_TAG);
    }

    @Override
    public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
        return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
                                                                     "test_" + thePartition.index(),
                                                                     "test_" + thePartition.index()).iterator()).asScala();
    }

    @Override
    public Partition[] getPartitions() {
        return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
    }

    static class TestPartition implements Partition {

        final int mIndex;

        public TestPartition(final int theIndex) {
            mIndex = theIndex;
        }

        public int index() {
            return mIndex;
        }
    }
}
like image 993
Pavel Klinov Avatar asked Oct 26 '16 09:10

Pavel Klinov


1 Answers

When I look inside DAGScheduler.submitMissingTasks I see that it uses its closure serializer on my RDD, which is the Java serializer, not the Kryo serializer which I'd expect.

SparkEnv supports two serializers, one named serializer which is used for serialization of your data, checkpointing, messaging between workers, etc and is available under spark.serializer configuration flag. The other is called closureSerializer under spark.closure.serializer which is used to check that your object is in fact serializable and is configurable for Spark <= 1.6.2 (but nothing other than JavaSerializer actually works) and hardcoded from 2.0.0 and above to JavaSerializer.

The Kryo closure serializer has a bug which make it unusable, you can see that bug under SPARK-7708 (this may be fixed with Kryo 3.0.0, but Spark is currently fixed with a specific version of Chill which is fixed on Kryo 2.2.1). Further, for Spark 2.0.x the JavaSerializer is now fixed instead of configurable (you can see it in this pull request). This mean that effectively we're stuck with the JavaSerializer for closure serialization.

Is this weird that we're using one serializer to submit tasks and other to serialize data between workers and such? definitely, but this is what we have.

To sum up, if you're setting the spark.serializer configuration, or using SparkContext.registerKryoClasses you'll be utilizing Kryo for most of your serialization in Spark. Having said that, for checking if a given class is serializable and serialization of tasks to workers, Spark will use JavaSerializer.

like image 153
Yuval Itzchakov Avatar answered Oct 04 '22 21:10

Yuval Itzchakov