Is the Avro SpecificRecord (i.e. the generated java classes) compatible with schema evolution? I.e. if I have a source of Avro messages (in my case, kafka) and I want to deserialize those messages to a specificrecord, is it possible to do safely?
What I see:
Even if the messages are compatible, this is a problem.
If I can find the new schema (using e.g. confluent schema registry) I can deserialize to GenericRecord, but there doesn't seem to be a way to map from genericrecord to specificrecord of different schema..
MySpecificType message = (T SpecificData.get().deepCopy(MySpecificType.SCHEMA$, genericMessage);
Deepcopy is mentioned in various places but it uses index so doesn't work..
Is there any safe way to map between two avro objects when you have both schemas and they are compatible? Even if I could map from genercrecord to genericrecord this would do as I could then do the deepcopy trick to complete the job.
SpecificRecord is an interface from the Avro library that allows us to use an Avro record as a POJO. This is done by generating a Java class (or classes) from the schema, by using avro-maven-plugin. The generated class will implement the SpecificRecord interface, as seen below.
The Kafka Avro Serializer keeps a cache of registered schemas from the Schema Registry their schema IDs. Consumers receive payloads and deserialize them with Kafka Avro Deserializers, which use the Confluent Schema Registry. The Deserializer looks up the full schema from the cache or Schema Registry based on ID.
By default, KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG is set to false, so your KafkaAvroDeserializer will produce a GenericData$Record by default, and not your desired Object (avro generated class).
As @JARC said, you can enable it programatically.
If you are using it in a Spring boot project, set in this way:
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.specific.avro.reader=true
There are example tests here for specific data type conversion. Its all in the configuration 'specificDeserializerProps'
https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/test/java/io/confluent/kafka/serializers/KafkaAvroSerializerTest.java
I added the following config and got the specific type out as wanted.
HashMap<String, String> specificDeserializerProps = new HashMap<String, String>();
specificDeserializerProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
specificDeserializerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
specificAvroDeserializer = new KafkaAvroDeserializer(schemaRegistry, specificDeserializerProps);
Hope that helps
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With