Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KafkaAvroDeserializer does not return SpecificRecord but returns GenericRecord

My KafkaProducer is able to use KafkaAvroSerializer to serialize objects to my topic. However, KafkaConsumer.poll() returns deserialized GenericRecord instead of my serialized class.

MyKafkaProducer

 KafkaProducer<CharSequence, MyBean> producer;
    try (InputStream props = Resources.getResource("producer.props").openStream()) {
      Properties properties = new Properties();
      properties.load(props);
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");

      MyBean bean = new MyBean();
      producer = new KafkaProducer<>(properties);
      producer.send(new ProducerRecord<>(topic, bean.getId(), bean));

My KafkaConsumer

 try (InputStream props = Resources.getResource("consumer.props").openStream()) {
      properties.load(props);
      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");
      consumer = new KafkaConsumer<>(properties);
    }
    consumer.subscribe(Arrays.asList(topic));
    try {
      while (true) {
        ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
        if (records.isEmpty()) {
          continue;
        }
        for (ConsumerRecord<CharSequence, MyBean> record : records) {
          MyBean bean = record.value(); // <-------- This is throwing a cast Exception because it cannot cast GenericRecord to MyBean
          System.out.println("consumer received: " + bean);
        }
      }

MyBean bean = record.value(); That line throws a cast Exception because it cannot cast GenericRecord to MyBean.

I'm using kafka-client-0.9.0.1, kafka-avro-serializer-3.0.0.

like image 228
Glide Avatar asked Sep 21 '16 01:09

Glide


2 Answers

KafkaAvroDeserializer supports SpecificData

It's not enabled by default. To enable it:

properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

KafkaAvroDeserializer does not support ReflectData

Confluent's KafkaAvroDeserializer does not know how to deserialize using Avro ReflectData. I had to extend it to support Avro ReflectData:

/**
 * Extends deserializer to support ReflectData.
 *
 * @param <V>
 *     value type
 */
public abstract class ReflectKafkaAvroDeserializer<V> extends KafkaAvroDeserializer {

  private Schema readerSchema;
  private DecoderFactory decoderFactory = DecoderFactory.get();

  protected ReflectKafkaAvroDeserializer(Class<V> type) {
    readerSchema = ReflectData.get().getSchema(type);
  }

  @Override
  protected Object deserialize(
      boolean includeSchemaAndVersion,
      String topic,
      Boolean isKey,
      byte[] payload,
      Schema readerSchemaIgnored) throws SerializationException {

    if (payload == null) {
      return null;
    }

    int schemaId = -1;
    try {
      ByteBuffer buffer = ByteBuffer.wrap(payload);
      if (buffer.get() != MAGIC_BYTE) {
        throw new SerializationException("Unknown magic byte!");
      }

      schemaId = buffer.getInt();
      Schema writerSchema = schemaRegistry.getByID(schemaId);

      int start = buffer.position() + buffer.arrayOffset();
      int length = buffer.limit() - 1 - idSize;
      DatumReader<Object> reader = new ReflectDatumReader(writerSchema, readerSchema);
      BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
      return reader.read(null, decoder);
    } catch (IOException e) {
      throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
    } catch (RestClientException e) {
      throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
    }
  }
}

Define a custom deserializer class which deserializes to MyBean:

public class MyBeanDeserializer extends ReflectKafkaAvroDeserializer<MyBean> {
  public MyBeanDeserializer() {
    super(MyBean.class);
  }
}

Configure KafkaConsumer to use the custom deserializer class:

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyBeanDeserializer.class);
like image 156
Chin Huang Avatar answered Nov 09 '22 01:11

Chin Huang


Edit : reflect data support got merged (see below)

To add to Chin Huang's answer, for minimal code and better performance, you should probably implement it this way :

/**
 * Extends deserializer to support ReflectData.
 *
 * @param <V>
 *     value type
 */
public abstract class SpecificKafkaAvroDeserializer<V extends SpecificRecordBase> extends AbstractKafkaAvroDeserializer implements Deserializer<V> {
  private final Schema schema;
  private Class<T> type;
  private DecoderFactory decoderFactory = DecoderFactory.get();

  protected SpecificKafkaAvroDeserializer(Class<T> type, Map<String, ?> props) {
    this.type = type;
    this.schema = ReflectData.get().getSchema(type);
    this.configure(this.deserializerConfig(props));
  }

  public void configure(Map<String, ?> configs) {
    this.configure(new KafkaAvroDeserializerConfig(configs));
  }

  @Override
  protected T deserialize(
          boolean includeSchemaAndVersion,
          String topic,
          Boolean isKey,
          byte[] payload,
          Schema readerSchemaIgnore) throws SerializationException {

    if (payload == null) {
      return null;
    }

    int schemaId = -1;
    try {
      ByteBuffer buffer = ByteBuffer.wrap(payload);
      if (buffer.get() != MAGIC_BYTE) {
        throw new SerializationException("Unknown magic byte!");
      }

      schemaId = buffer.getInt();
      Schema schema = schemaRegistry.getByID(schemaId);

      Schema readerSchema = ReflectData.get().getSchema(type);

      int start = buffer.position() + buffer.arrayOffset();
      int length = buffer.limit() - 1 - idSize;
      SpecificDatumReader<T> reader = new SpecificDatumReader(schema, readerSchema);
      BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
      return reader.read(null, decoder);
    } catch (IOException e) {
      throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
    } catch (RestClientException e) {
      throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
    }
  }
}
like image 1
Gepsens Avatar answered Nov 09 '22 01:11

Gepsens