I'm working with Avro 1.7.0 using the its Java's generic representation API and I have a problem dealing with our current case of schema evolution. The scenario we're dealing with here is making a primitive-type field optional by changing the field to be a union of null
and that primitive type.
I'm going to use a simple example. Basically, our schemas are:
int
null
and int
According to the schema resolution chapter of Avro's spec, the resolution for such a case should be:
if reader's is a union, but writer's is not
The first schema in the reader's union that matches the writer's schema is recursively resolved against it. If none match, an error is signalled.
My interpretation is that we should resolve data serialized with the initial schema properly as int
is part of the union in the reader's schema.
However, when running a test of reading back a record serialized with version 1 using the version 2, I get
org.apache.avro.AvroTypeException: Attempt to process a int when a union was expected.
Here's a test that shows exactly this:
@Test
public void testReadingUnionFromValueWrittenAsPrimitive() throws Exception {
Schema writerSchema = new Schema.Parser().parse("{\n" +
" \"type\":\"record\",\n" +
" \"name\":\"NeighborComparisons\",\n" +
" \"fields\": [\n" +
" {\"name\": \"test\",\n" +
" \"type\": \"int\" }]} ");
Schema readersSchema = new Schema.Parser().parse(" {\n" +
" \"type\":\"record\",\n" +
" \"name\":\"NeighborComparisons\",\n" +
" \"fields\": [ {\n" +
" \"name\": \"test\",\n" +
" \"type\": [\"null\", \"int\"],\n" +
" \"default\": null } ] }");
// Writing a record using the initial schema with the
// test field defined as an int
GenericData.Record record = new GenericData.Record(writerSchema);
record.put("test", Integer.valueOf(10));
ByteArrayOutputStream output = new ByteArrayOutputStream();
JsonEncoder jsonEncoder = EncoderFactory.get().
jsonEncoder(writerSchema, output);
GenericDatumWriter<GenericData.Record> writer = new
GenericDatumWriter<GenericData.Record>(writerSchema);
writer.write(record, jsonEncoder);
jsonEncoder.flush();
output.flush();
System.out.println(output.toString());
// We try reading it back using the second schema
// version where the test field is defined as a union of null and int
JsonDecoder jsonDecoder = DecoderFactory.get().
jsonDecoder(readersSchema, output.toString());
GenericDatumReader<GenericData.Record> reader =
new GenericDatumReader<GenericData.Record>(writerSchema,
readersSchema);
GenericData.Record read = reader.read(null, jsonDecoder);
// We should be able to assert that the value is 10 but it
// fails on reading the record before getting here
assertEquals(10, read.get("test"));
}
I would like to either know if my expectations are correct (this should resolve successfully right?) or where I'm not using avro properly to handle such a scenario.
Schema evolution allows you to update the schema used to write new data, while maintaining backwards compatibility with the schema(s) of your old data. Then you can read it all together, as if all of the data has one schema. Of course there are precise rules governing the changes allowed, to maintain compatibility.
A common trait shared by these platforms is that they used Apache Avro to provide strong schema-on-write data contracts. Importantly, Avro also offers the ability for customers to safely and confidently evolve their data model definitions. After all — we should expect the shape of data to change over time.
A union indicates that a field might have more than one data type. For example, a union might indicate that a field can be a string or a null. A union is represented as a JSON array containing the data types.
Logical types specify a way of representing a high-level type as a base Avro type. For example, a date is specified as the number of days after the unix epoch (or before using a negative value). This enables extensions to Avro's type system without breaking binary compatibility.
The expectations that migrating a primitive schema to a union of null and a primitive are correct.
The problem with the code above is with how the decoder is created. The decoder needs the writer's schema rather than the reader's schema.
Rather than doing this:
JsonDecoder jsonDecoder = DecoderFactory.get().
jsonDecoder(readersSchema, output.toString());
It should be like this:
JsonDecoder jsonDecoder = DecoderFactory.get().
jsonDecoder(writerSchema, output.toString());
Credits goes to Doug Cutting for the answer on avro's user mailing list: http://mail-archives.apache.org/mod_mbox/avro-user/201208.mbox/browser
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With