When attempting to write avro, I get the following error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 35.0 failed 1 times, most recent failure: Lost task 7.0 in stage 35.0 (TID 110, localhost): java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.avro.mapred.AvroWrapper
I had read in an avro file with 3 records using:
avro_rdd = sc.newAPIHadoopFile(
"threerecords.avro",
"org.apache.avro.mapreduce.AvroKeyInputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
conf=None)
output = avro_rdd.map(lambda x: x[0]).collect()
Then I tried to write out a single record (output kept in avro) with:
conf = {"avro.schema.input.key": reduce(lambda x, y: x + y, sc.textFile("myschema.avsc", 1).collect())}
sc.parallelize([output[0]]).map(lambda x: (x, None)).saveAsNewAPIHadoopFile(
"output.avro",
"org.apache.avro.mapreduce.AvroKeyOutputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
conf=conf)
How do I get around that error/write out an individual avro record succsssfully? I know my schema is correct because it is from the avro itself.
It looks like this isn't supported at the moment. You are now trying to use the java map as an Avro Record and covert it to a Java map again. That's why you get the error the error about the java hashmap.
There is a pull request from staslos to add the Avro output format, see link for the pull request and the example.
There is a converter required which is missing in AvroConverters.scala to convert from the java map back to the avro format.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With