Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

org.apache.spark.SparkException: Task not serializable

This is a working code example:

JavaPairDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group, topicMap);
messages.print();
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
    @Override
    public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
    }
});

I get the below error:

ERROR:
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:140)
    at org.apache.spark.streaming.api.java.JavaPairDStream.map(JavaPairDStream.scala:46)
like image 937
xiaolong li Avatar asked Mar 27 '15 08:03

xiaolong li


People also ask

Is not serializable?

A non-serializable value is a complex object, like a class instance or a function. It is not an array, a plain serializable object, nor a primitive (like strings, numbers, booleans, null, etc.). Otherwise, it would be included in the list of the items that JSON supports.

What is serializable in Scala?

Serializing a Scala object for JSON storage means converting the object to a string and then writing it out to disk. Start by creating a case class and instantiating an object.

What is serialization and Deserialization in spark?

Serialization is to convert an object to byte stream and the vice versa is for de-serialization. This is very helpful when you save object to disk and send them in network. These scenarios are commonly happen when we execute in distributed environments. As we know Apache spark works in distributed environments.

How do you make a function serializable in Scala?

To make a Scala class serializable, extend the Serializable trait and add the @SerialVersionUID annotation to the class: @SerialVersionUID(100L) class Stock(var symbol: String, var price: BigDecimal) extends Serializable { // code here ... }


2 Answers

Since you're defining your map function using an anonymous inner class, the containing class must also be Serializable. Define your map function as a separate class or make it a static inner class. From the Java documentation (http://docs.oracle.com/javase/8/docs/platform/serialization/spec/serial-arch.html):

Note - Serialization of inner classes (i.e., nested classes that are not static member classes), including local and anonymous classes, is strongly discouraged for several reasons. Because inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

like image 168
InPursuit Avatar answered Nov 14 '22 20:11

InPursuit


just providing the code sample :

JavaDStream<String> lines = messages.map(mapFunc);

declare the inner class as a static variable :

static Function<Tuple2<String, String>, String> mapFunc=new Function<Tuple2<String, String>, String>() {
    @Override
    public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
    }
}
like image 22
udyan Avatar answered Nov 14 '22 22:11

udyan