Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Deserialize Avro messages into specific datum using KafkaAvroDecoder

I'm reading from a Kafka topic, which contains Avro messages serialized using the KafkaAvroEncoder (which automatically registers the schemas with the topics). I'm using the maven-avro-plugin to generate plain Java classes, which I'd like to use upon reading.

The KafkaAvroDecoder only supports deserializing into GenericData.Record types, which (in my opinion) misses the whole point of having a statically typed language. My deserialization code currently looks like this:

    SpecificDatumReader<event> reader = new SpecificDatumReader<>(
        event.getClassSchema() // event is my class generated from the schema
    );
    byte[] in = ...; // my input bytes;
    ByteBuffer stuff = ByteBuffer.wrap(in);
    // the KafkaAvroEncoder puts a magic byte and the ID of the schema (as stored 
    //   in the schema-registry) before the serialized message
    if (stuff.get() != 0x0) {
        return;
    }
    int id = stuff.getInt();

    // lets just ignore those special bytes
    int length = stuff.limit() - 4 - 1;
    int start = stuff.position() + stuff.arrayOffset();

    Decoder decoder = DecoderFactory.get().binaryDecoder(
        stuff.array(), start, length, null
    );
    try {
        event ev = reader.read(null, decoder);
    } catch (IOException e) {
        e.printStackTrace();
    }

I found my solution cumbersome, so I'd like to know if there is a simpler solution to do this.

like image 358
kosii Avatar asked Mar 13 '23 08:03

kosii


1 Answers

Thanks to the comment I was able to find the answer. The secret was to instantiate KafkaAvroDecoder with a Properties specifying the use of the specific Avro reader, that is:

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
            io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_C‌ONFIG, "...");
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    VerifiableProp vProps = new VerifiableProperties(props);

    KafkaAvroDecoder decoder = new KafkaAvroDecoder(vProps);
    MyLittleData data = (MyLittleData) decoder.fromBytes(input);

The same configuration applies for the case of using directly the KafkaConsumer<K, V> class (I'm consuming from Kafka in Storm using the KafkaSpout from the storm-kafka project, which uses the SimpleConsumer, so I have to manually deserialize the messages. For the courageous there is the storm-kafka-client project, which does this automatically by using the new style consumer).

like image 173
kosii Avatar answered Mar 19 '23 16:03

kosii