This is a working code example:
JavaPairDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group, topicMap);
messages.print();
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
I get the below error:
ERROR:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:140)
at org.apache.spark.streaming.api.java.JavaPairDStream.map(JavaPairDStream.scala:46)
A non-serializable value is a complex object, like a class instance or a function. It is not an array, a plain serializable object, nor a primitive (like strings, numbers, booleans, null, etc.). Otherwise, it would be included in the list of the items that JSON supports.
Serializing a Scala object for JSON storage means converting the object to a string and then writing it out to disk. Start by creating a case class and instantiating an object.
Serialization is to convert an object to byte stream and the vice versa is for de-serialization. This is very helpful when you save object to disk and send them in network. These scenarios are commonly happen when we execute in distributed environments. As we know Apache spark works in distributed environments.
To make a Scala class serializable, extend the Serializable trait and add the @SerialVersionUID annotation to the class: @SerialVersionUID(100L) class Stock(var symbol: String, var price: BigDecimal) extends Serializable { // code here ... }
Since you're defining your map function using an anonymous inner class, the containing class must also be Serializable. Define your map function as a separate class or make it a static inner class. From the Java documentation (http://docs.oracle.com/javase/8/docs/platform/serialization/spec/serial-arch.html):
Note - Serialization of inner classes (i.e., nested classes that are not static member classes), including local and anonymous classes, is strongly discouraged for several reasons. Because inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.
just providing the code sample :
JavaDStream<String> lines = messages.map(mapFunc);
declare the inner class as a static variable :
static Function<Tuple2<String, String>, String> mapFunc=new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With