Update: spark-avro package was update to support this scenario. https://github.com/databricks/spark-avro/releases/tag/v3.1.0
I have an AVRO file that was created by a third party outside my control, which I need to process using spark. The AVRO schema is a record where one of the fields is a mixed union type:
{
"name" : "Properties",
"type" : {
"type" : "map",
"values" : [ "long", "double", "string", "bytes" ]
}
This is unsupported with the spark-avro reader:
In addition to the types listed above, it supports reading of three types of union types: union(int, long) union(float, double) union(something, null), where something is one of the supported Avro types listed above or is one of the supported union types.
Reading about AVRO's schema evolution and resolution, I expect to be able to read the file while skipping the problematic field by specifying a different reader schema that omits this field. According to AVRO Schema Resolution docs, it should work:
if the writer's record contains a field with a name not present in the reader's record, the writer's value for that field is ignored.
So I modified using
val df = sqlContext.read.option("avroSchema", avroSchema).avro(path)
Where avroSchema
is the exact same schema, the writer used, but without the problematic field.
But still I get the same error regarding mixed union types.
Is this scenario of schema evolution supported with AVRO? with avro-spark? Is there another way to achieve my goal?
Update:
I have tested the same scenario (same file actually) with Apache Avro 1.8.1 and it works as expected. Then it must be specifically with spark-avro
. any ideas?
Since Spark 2.4 release, Spark SQL provides built-in support for reading and writing Apache Avro data.
Apache Avro is defined as an open-source, row-based, data-serialization and data exchange framework for the Hadoop or big data projects. . Apache Avro is mainly used in Apache Spark, especially for Kafka-based data pipelines. Last Updated: 06 Jun 2022.
Update: spark-avro package was update to support this scenario. https://github.com/databricks/spark-avro/releases/tag/v3.1.0
This does not actually answer my question, rather a different solution for the same problem.
Since currently spark-avro is does not have this functionality (see my comment for the question) - I have instead used avro's org.apache.avro.mapreduce and spark's newAPIHadoopFile. Here is a simple example of that:
val path = "..."
val conf = new SparkConf().setAppName("avro test")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val avroRdd = sc.newAPIHadoopFile(path,
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable])
contrarily to spark-avro, the official avro libs supports mixed union types and schema evolution.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With