Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Avro Schema to spark StructType

This is effectively the same as my previous question, but using Avro rather than JSON as the data format.

I'm working with a Spark dataframe which could be loading data from one of a few different schema versions:

// Version One
{"namespace": "com.example.avro",
 "type": "record",
 "name": "MeObject",
 "fields": [
     {"name": "A", "type": ["null", "int"], "default": null}
 ]
}

// Version Two
{"namespace": "com.example.avro",
 "type": "record",
 "name": "MeObject",
 "fields": [
     {"name": "A", "type": ["null", "int"], "default": null},
     {"name": "B", "type": ["null", "int"], "default": null}
 ]
}

I'm using Spark Avro to load the data.

DataFrame df = context.read()
  .format("com.databricks.spark.avro")
  .load("path/to/avro/file");

which may be a Version One file or Version Two file. However I'd like to be able to process it in an identical manner, with the unknown values set to "null". The recommendation in my previous question was to set the schema, however I do not want to repeat myself writing the schema in both a .avro file and as sparks StructType and friends. How can I convert the avro schema (either text file or the generated MeObject.getClassSchema()) into sparks StructType?

Spark Avro has a SchemaConverters, but it is all private and returns some strange internal object.

like image 388
Michael Lloyd Lee mlk Avatar asked Nov 24 '15 16:11

Michael Lloyd Lee mlk


2 Answers

Disclaimer: It's kind of a dirty hack. It depends on a few things:

  • Python provides a lightweight Avro processing library and due to its dynamism it doesn't require typed writers
  • an empty Avro file is still a valid document
  • Spark schema can be converted to and from JSON

Following code reads an Avro schema file, creates an empty Avro file with given schema, reads it using spark-csv and outputs Spark schema as a JSON file.

import argparse
import tempfile

import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter

from pyspark import SparkContext
from pyspark.sql import SQLContext

def parse_schema(schema):
    with open(schema) as fr:
        return avro.schema.parse(open(schema).read())

def write_dummy(schema):
    tmp = tempfile.mktemp(suffix='.avro')
    with open(tmp, "w") as fw:
        writer = DataFileWriter(fw, DatumWriter(), schema)
        writer.close()
    return tmp

def write_spark_schema(path, schema):
    with open(path, 'w') as fw:
        fw.write(schema.json())


def main():
    parser = argparse.ArgumentParser(description='Avro schema converter')
    parser.add_argument('--schema')
    parser.add_argument('--output')
    args = parser.parse_args()

    sc = SparkContext('local[1]', 'Avro schema converter')
    sqlContext = SQLContext(sc)

    df = (sqlContext.read.format('com.databricks.spark.avro')
            .load(write_dummy(parse_schema(args.schema))))

    write_spark_schema(args.output, df.schema)
    sc.stop()


if __name__ == '__main__':
    main()

Usage:

bin/spark-submit --packages com.databricks:spark-avro_2.10:2.0.1 \ 
   avro_to_spark_schema.py \
   --schema path_to_avro_schema.avsc \
   --output path_to_spark_schema.json

Read schema:

import scala.io.Source
import org.apache.spark.sql.types.{DataType, StructType}

val json: String = Source.fromFile("schema.json").getLines.toList.head
val schema: StructType = DataType.fromJson(json).asInstanceOf[StructType]
like image 168
zero323 Avatar answered Nov 05 '22 08:11

zero323


pls see if this helps, although little late. I was trying this hard for my current work. I have used schemaconverter from Databricks. I suppose, you were trying to read the avro file with the given schema.

 val schemaObj = new Schema.Parser().parse(new File(avscfilepath));
 var sparkSchema : StructType = new StructType
 import scala.collection.JavaConversions._     
 for(field <- schemaObj.getFields()){
  sparkSchema = sparkSchema.add(field.name, SchemaConverters.toSqlType(field.schema).dataType)
 }
 sparkSchema
like image 3
hadooper Avatar answered Nov 05 '22 08:11

hadooper