This is effectively the same as my previous question, but using Avro rather than JSON as the data format.
I'm working with a Spark dataframe which could be loading data from one of a few different schema versions:
// Version One
{"namespace": "com.example.avro",
"type": "record",
"name": "MeObject",
"fields": [
{"name": "A", "type": ["null", "int"], "default": null}
]
}
// Version Two
{"namespace": "com.example.avro",
"type": "record",
"name": "MeObject",
"fields": [
{"name": "A", "type": ["null", "int"], "default": null},
{"name": "B", "type": ["null", "int"], "default": null}
]
}
I'm using Spark Avro to load the data.
DataFrame df = context.read()
.format("com.databricks.spark.avro")
.load("path/to/avro/file");
which may be a Version One file or Version Two file. However I'd like to be able to process it in an identical manner, with the unknown values set to "null". The recommendation in my previous question was to set the schema, however I do not want to repeat myself writing the schema in both a .avro
file and as sparks StructType
and friends. How can I convert the avro schema (either text file or the generated MeObject.getClassSchema()
) into sparks StructType
?
Spark Avro has a SchemaConverters
, but it is all private and returns some strange internal object.
Disclaimer: It's kind of a dirty hack. It depends on a few things:
Following code reads an Avro schema file, creates an empty Avro file with given schema, reads it using spark-csv
and outputs Spark schema as a JSON file.
import argparse
import tempfile
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
from pyspark import SparkContext
from pyspark.sql import SQLContext
def parse_schema(schema):
with open(schema) as fr:
return avro.schema.parse(open(schema).read())
def write_dummy(schema):
tmp = tempfile.mktemp(suffix='.avro')
with open(tmp, "w") as fw:
writer = DataFileWriter(fw, DatumWriter(), schema)
writer.close()
return tmp
def write_spark_schema(path, schema):
with open(path, 'w') as fw:
fw.write(schema.json())
def main():
parser = argparse.ArgumentParser(description='Avro schema converter')
parser.add_argument('--schema')
parser.add_argument('--output')
args = parser.parse_args()
sc = SparkContext('local[1]', 'Avro schema converter')
sqlContext = SQLContext(sc)
df = (sqlContext.read.format('com.databricks.spark.avro')
.load(write_dummy(parse_schema(args.schema))))
write_spark_schema(args.output, df.schema)
sc.stop()
if __name__ == '__main__':
main()
Usage:
bin/spark-submit --packages com.databricks:spark-avro_2.10:2.0.1 \
avro_to_spark_schema.py \
--schema path_to_avro_schema.avsc \
--output path_to_spark_schema.json
Read schema:
import scala.io.Source
import org.apache.spark.sql.types.{DataType, StructType}
val json: String = Source.fromFile("schema.json").getLines.toList.head
val schema: StructType = DataType.fromJson(json).asInstanceOf[StructType]
pls see if this helps, although little late. I was trying this hard for my current work. I have used schemaconverter from Databricks. I suppose, you were trying to read the avro file with the given schema.
val schemaObj = new Schema.Parser().parse(new File(avscfilepath));
var sparkSchema : StructType = new StructType
import scala.collection.JavaConversions._
for(field <- schemaObj.getFields()){
sparkSchema = sparkSchema.add(field.name, SchemaConverters.toSqlType(field.schema).dataType)
}
sparkSchema
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With