I'm trying to run the following simple Spark code:
Gson gson = new Gson();
JavaRDD<String> stringRdd = jsc.textFile("src/main/resources/META-INF/data/supplier.json");
JavaRDD<SupplierDTO> rdd = stringRdd.map(new Function<String, SupplierDTO>()
{
private static final long serialVersionUID = -78238876849074973L;
@Override
public SupplierDTO call(String str) throws Exception
{
return gson.fromJson(str, SupplierDTO.class);
}
});
But it's throwing the following error while executing the stringRdd.map
statement:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
at org.apache.spark.rdd.RDD.map(RDD.scala:288)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78)
at org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:32)
at com.demo.spark.processor.cassandra.CassandraDataUploader.uploadData(CassandraDataUploader.java:71)
at com.demo.spark.processor.cassandra.CassandraDataUploader.main(CassandraDataUploader.java:47)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 7 more
Here 'jsc' is the JavaSparkContext
object I'm using.
As far as I know, JavaSparkContext
is not a Serializable
object and one should not use it inside any functions which will be sent to the Spark workers.
Now, what I'm not able to understand is, how the instance of JavaSparkContext
is being sent to the workers? What should I change in my code to avoid such scenario?
The gson
reference is 'pulling' the outer class into the scope of the closure, taking its full object graph with it.
In this case, create the gson object within the closure:
public SupplierDTO call(String str) throws Exception {
Gson gson = Gson();
return gson.fromJson(str, SupplierDTO.class);
}
You can also declare the spark context transient
If the creation of the Gson instance is costly, consider using mapPartitions
instead of map
.
For me I resolved this problem using one of the following choices:
transient
static Gson gson = new Gson();
Please refer to the doc Job aborted due to stage failure: Task not serializable
to see other available choices to resolve this probleme
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With