Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Serializing RDD

I have an RDD which I am trying to serialize and then reconstruct by deserializing. I am trying to see if this is possible in Apache Spark.

     static JavaSparkContext sc = new JavaSparkContext(conf);
        static SerializerInstance si = SparkEnv.get().closureSerializer().newInstance();
    static ClassTag<JavaRDD<String>> tag = scala.reflect.ClassTag$.MODULE$.apply(JavaRDD.class);
..
..
            JavaRDD<String> rdd = sc.textFile(logFile, 4);
            System.out.println("Element 1 " + rdd.first());
            ByteBuffer bb= si.serialize(rdd, tag);
            JavaRDD<String> rdd2 = si.deserialize(bb, Thread.currentThread().getContextClassLoader(),tag);
            System.out.println(rdd2.partitions().size());
            System.out.println("Element 0 " + rdd2.first());

I get an exception on the last line when I perform an action on the newly created RDD. The way I am serializing is similar to how it is done internally in Spark.

Exception in thread "main" org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
    at org.apache.spark.rdd.RDD.sc(RDD.scala:87)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1177)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1189)
    at org.apache.spark.api.java.JavaRDDLike$class.first(JavaRDDLike.scala:477)
    at org.apache.spark.api.java.JavaRDD.first(JavaRDD.scala:32)
    at SimpleApp.sparkSend(SimpleApp.java:63)
    at SimpleApp.main(SimpleApp.java:91)

The RDD is created and loaded within the same process, so I don't understand how this error happens.

like image 447
RagHaven Avatar asked Dec 02 '22 17:12

RagHaven


1 Answers

I'm the author of this warning message.

Spark does not support performing actions and transformations on copies of RDDs that are created via deserialization. RDDs are serializable so that certain methods on them can be invoked in executors, but end users shouldn't try to manually perform RDD serialization.

When an RDD is serialized, it loses its reference to the SparkContext that created it, preventing jobs from being launched with it (see here). In earlier versions of Spark, your code would result in a NullPointerException when Spark tried to access the private, null RDD.sc field.

This error message was worded this way because users were frequently running into confusing NullPointerExceptions when trying to do things like rdd1.map { _ => rdd2.count() }, which caused actions to be invoked on deserialized RDDs on executor machines. I didn't anticipate that anyone would try to manually serialize / deserialize their RDDs on the driver, so I can see how this error message could be slightly misleading.

like image 74
Josh Rosen Avatar answered Dec 22 '22 06:12

Josh Rosen