Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema

I am creating avro RDD with following code.

 def convert2Avro(data : String ,schema : Schema)  : AvroKey[GenericRecord] = {
   var wrapper = new AvroKey[GenericRecord]()
   var record = new GenericData.Record(schema)
   record.put("empname","John")
    wrapper.datum(record)
    return wrapper 
  }

and creating avro RDD as follows.

 var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schema)))

while executing, I am getting following exception in above line

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
        at org.apache.spark.rdd.RDD.map(RDD.scala:270)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

any pointer?

like image 355
Ramakrishna Avatar asked Feb 09 '15 15:02

Ramakrishna


1 Answers

Schema.ReocrdSchema class has not implemented serializable. So it could not transferred over the network. We can convert the schema to string and pass to method and inside the method reconstruct the schema object.

var schemaString = schema.toString
var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schemaString)))

Inside the method reconstruct the schema:

def convert2Avro(data : String ,schemaString : String)  : AvroKey[GenericRecord] = {
   var schema = parser.parse(schemaString)
   var wrapper = new AvroKey[GenericRecord]()
   var record = new GenericData.Record(schema)
   record.put("empname","John")
    wrapper.datum(record)
    return wrapper 
  }
like image 95
Ramakrishna Avatar answered Mar 25 '23 12:03

Ramakrishna