Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Avro fails to deserialize message with updated schema

I have a schema which has been updated to include a new field. I'm using avro reflection and the confluent schema registry to deserialize/serialize data like so:

Serialization:

Schema schema = REFLECT_DATA.getSchema(value.getClass());
try {
    int registeredSchemaId = this.schemaRegistry.register(subject, schema);

    ByteArrayOutputStream out = new ByteArrayOutputStream();
    out.write(0);
    out.write(ByteBuffer.allocate(4).putInt(registeredSchemaId).array());

    DatumWriter<Object> dw = new ReflectDatumWriter<>(schema);
    Encoder encoder = ENCODER_FACTORY.directBinaryEncoder(out, null);
    dw.write(value, encoder);
    encoder.flush();
    return out.toByteArray();
} catch (RuntimeException | IOException e) {
    throw new SerializationException("Error serializing Avro message", e);
} catch (RestClientException e) {
    throw new SerializationException("Error registering Avro schema: " + schema, e);
}

Deserialization:

if (readerSchema == null) {
    readerSchema = new Schema.Parser().parse(schemaString);
}

int schemaId = -1;
try {
    ByteBuffer buffer = ByteBuffer.wrap(payload);
    if (buffer.get() != MAGIC_BYTE) {
        throw new SerializationException("Unknown magic byte!");
    }

    schemaId = buffer.getInt();
    Schema writerSchema = schemaRegistry.getById(schemaId);

    int start = buffer.position() + buffer.arrayOffset();
    int length = buffer.limit() - 1 - idSize;
    DatumReader<Object> reader = new ReflectDatumReader<>(writerSchema, readerSchema);
    BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
    return reader.read(null, decoder);  //line 83
} catch (IOException e) {
    throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
} catch (RestClientException e) {
    throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
}

The schema is defined by a scala case class, the old one looks like this:

case class Data(oldField: String) {
    def this("")
}

and it has been updated like so:

case class Data(oldField: String, @AvroDefault("") newField: String) {
    def this("", "")
}

However, deserializing sometimes throws an AvroTypeException with the following stack:

Caused by: org.apache.avro.AvroTypeException: Found com.company.project.DataHolder$.Data, expecting com.company.project.DataHolder$.Data
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
    at io.fama.pubsub.KafkaAvroReflectDeserializer.deserialize(KafkaAvroReflectDeserializer.java:83)

Which I think is caused by difficulties serializing old messages (but am not entirely sure - I just can't reason as to what else it could be). Has anyone else ever experienced this error or does anyone have any ideas to fix it?

like image 920
cscan Avatar asked Nov 07 '22 22:11

cscan


1 Answers

If you're using the org.apache.avro.reflect attributes then I don't think you can use Scala case classes-- Scala case class params are immutable, and I believe the attribute mapper will need to have a class with an public empty constructor, and java-visible fields, possibly even @BeanProperty to generate java setters/getters.

like image 180
SourceSimian Avatar answered Nov 11 '22 17:11

SourceSimian