I have a schema which has been updated to include a new field. I'm using avro reflection and the confluent schema registry to deserialize/serialize data like so:
Serialization:
Schema schema = REFLECT_DATA.getSchema(value.getClass());
try {
int registeredSchemaId = this.schemaRegistry.register(subject, schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0);
out.write(ByteBuffer.allocate(4).putInt(registeredSchemaId).array());
DatumWriter<Object> dw = new ReflectDatumWriter<>(schema);
Encoder encoder = ENCODER_FACTORY.directBinaryEncoder(out, null);
dw.write(value, encoder);
encoder.flush();
return out.toByteArray();
} catch (RuntimeException | IOException e) {
throw new SerializationException("Error serializing Avro message", e);
} catch (RestClientException e) {
throw new SerializationException("Error registering Avro schema: " + schema, e);
}
Deserialization:
if (readerSchema == null) {
readerSchema = new Schema.Parser().parse(schemaString);
}
int schemaId = -1;
try {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != MAGIC_BYTE) {
throw new SerializationException("Unknown magic byte!");
}
schemaId = buffer.getInt();
Schema writerSchema = schemaRegistry.getById(schemaId);
int start = buffer.position() + buffer.arrayOffset();
int length = buffer.limit() - 1 - idSize;
DatumReader<Object> reader = new ReflectDatumReader<>(writerSchema, readerSchema);
BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
return reader.read(null, decoder); //line 83
} catch (IOException e) {
throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
} catch (RestClientException e) {
throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
}
The schema is defined by a scala case class, the old one looks like this:
case class Data(oldField: String) {
def this("")
}
and it has been updated like so:
case class Data(oldField: String, @AvroDefault("") newField: String) {
def this("", "")
}
However, deserializing sometimes throws an AvroTypeException with the following stack:
Caused by: org.apache.avro.AvroTypeException: Found com.company.project.DataHolder$.Data, expecting com.company.project.DataHolder$.Data
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
at io.fama.pubsub.KafkaAvroReflectDeserializer.deserialize(KafkaAvroReflectDeserializer.java:83)
Which I think is caused by difficulties serializing old messages (but am not entirely sure - I just can't reason as to what else it could be). Has anyone else ever experienced this error or does anyone have any ideas to fix it?
If you're using the org.apache.avro.reflect attributes then I don't think you can use Scala case classes-- Scala case class params are immutable, and I believe the attribute mapper will need to have a class with an public empty constructor, and java-visible fields, possibly even @BeanProperty to generate java setters/getters.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With