I'm reading from a Kafka topic, which contains Avro messages serialized using the KafkaAvroEncoder
(which automatically registers the schemas with the topics). I'm using the maven-avro-plugin to generate plain Java classes, which I'd like to use upon reading.
The KafkaAvroDecoder
only supports deserializing into GenericData.Record
types, which (in my opinion) misses the whole point of having a statically typed language. My deserialization code currently looks like this:
SpecificDatumReader<event> reader = new SpecificDatumReader<>(
event.getClassSchema() // event is my class generated from the schema
);
byte[] in = ...; // my input bytes;
ByteBuffer stuff = ByteBuffer.wrap(in);
// the KafkaAvroEncoder puts a magic byte and the ID of the schema (as stored
// in the schema-registry) before the serialized message
if (stuff.get() != 0x0) {
return;
}
int id = stuff.getInt();
// lets just ignore those special bytes
int length = stuff.limit() - 4 - 1;
int start = stuff.position() + stuff.arrayOffset();
Decoder decoder = DecoderFactory.get().binaryDecoder(
stuff.array(), start, length, null
);
try {
event ev = reader.read(null, decoder);
} catch (IOException e) {
e.printStackTrace();
}
I found my solution cumbersome, so I'd like to know if there is a simpler solution to do this.
Thanks to the comment I was able to find the answer. The secret was to instantiate KafkaAvroDecoder
with a Properties
specifying the use of the specific Avro reader, that is:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "...");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
VerifiableProp vProps = new VerifiableProperties(props);
KafkaAvroDecoder decoder = new KafkaAvroDecoder(vProps);
MyLittleData data = (MyLittleData) decoder.fromBytes(input);
The same configuration applies for the case of using directly the KafkaConsumer<K, V>
class (I'm consuming from Kafka in Storm using the KafkaSpout
from the storm-kafka project, which uses the SimpleConsumer
, so I have to manually deserialize the messages. For the courageous there is the storm-kafka-client project, which does this automatically by using the new style consumer).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With