Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Skipping fields in a record using spark-avro

Update: spark-avro package was update to support this scenario. https://github.com/databricks/spark-avro/releases/tag/v3.1.0

I have an AVRO file that was created by a third party outside my control, which I need to process using spark. The AVRO schema is a record where one of the fields is a mixed union type:

{    
    "name" : "Properties",                              
    "type" : {                                          
    "type" : "map",                                   
    "values" : [ "long", "double", "string", "bytes" ]
}                                                   

This is unsupported with the spark-avro reader:

In addition to the types listed above, it supports reading of three types of union types: union(int, long) union(float, double) union(something, null), where something is one of the supported Avro types listed above or is one of the supported union types.

Reading about AVRO's schema evolution and resolution, I expect to be able to read the file while skipping the problematic field by specifying a different reader schema that omits this field. According to AVRO Schema Resolution docs, it should work:

if the writer's record contains a field with a name not present in the reader's record, the writer's value for that field is ignored.

So I modified using

 val df = sqlContext.read.option("avroSchema", avroSchema).avro(path)

Where avroSchema is the exact same schema, the writer used, but without the problematic field.

But still I get the same error regarding mixed union types.

Is this scenario of schema evolution supported with AVRO? with avro-spark? Is there another way to achieve my goal?


Update: I have tested the same scenario (same file actually) with Apache Avro 1.8.1 and it works as expected. Then it must be specifically with spark-avro. any ideas?

like image 326
itaysk Avatar asked Nov 03 '16 15:11

itaysk


People also ask

Does spark support Avro?

Since Spark 2.4 release, Spark SQL provides built-in support for reading and writing Apache Avro data.

What is Avro format in spark?

Apache Avro is defined as an open-source, row-based, data-serialization and data exchange framework for the Hadoop or big data projects. . Apache Avro is mainly used in Apache Spark, especially for Kafka-based data pipelines. Last Updated: 06 Jun 2022.


1 Answers

Update: spark-avro package was update to support this scenario. https://github.com/databricks/spark-avro/releases/tag/v3.1.0

This does not actually answer my question, rather a different solution for the same problem.

Since currently spark-avro is does not have this functionality (see my comment for the question) - I have instead used avro's org.apache.avro.mapreduce and spark's newAPIHadoopFile. Here is a simple example of that:

val path = "..."
val conf = new SparkConf().setAppName("avro test")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
val sc = new SparkContext(conf)

val avroRdd = sc.newAPIHadoopFile(path,
  classOf[AvroKeyInputFormat[GenericRecord]],
  classOf[AvroKey[GenericRecord]],
  classOf[NullWritable])

contrarily to spark-avro, the official avro libs supports mixed union types and schema evolution.

like image 199
itaysk Avatar answered Nov 04 '22 00:11

itaysk