I have been playing with confluent version of kafka since some days, to get better understanding of platform. I am getting some serialization exceptions for some malformed avro messages sent to one topic. Let me explain the problem with facts :
<kafka.new.version>0.10.2.0-cp1</kafka.new.version>
<confluent.version>3.2.0</confluent.version>
<avro.version>1.7.7</avro.version>
Intention : Very simple, Producer is sending Avro records, and Consumer should consume all records without any issue, (it can leave all messages not compatible with schema in schema registry.) Usage :
Producer ->
Key -> StringSerializer
Value -> KafkaAvroSerializer
Consumer ->
Key -> StringDeserializer
Value -> KafkaAvroDeserializer
Other Consumer Properties (Just for reference) :
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "somehost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myconsumer-4");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "someclient-4");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
properties.put("schema.registry.url", "schemaregistryhost:8081");
I was able to consume messages without any issues until some other producer mistakenly sent one message to this topic and that modified the latest schema in schema registry. (We have enabled one option in schema registry so you can send any messages to topic and schema registry will make a new version of schema everytime, we can switch if off too.)
Now, due to this one bad message, poll() is failing with Serialization issues. It does give me the offset where it is failing and i can pass the offset by using seek() but that doesn't sound good. I also tried using max poll records to 10 and poll() timeout to very small so that i can ignore max 10 records by catching Exception but for some reason max-records is not working and code fails with Serialization error immediately even if i start from beginning and bad message is at 240 offset.
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
Another simple solution is to use ByteArrayDeserializer and use KafkaAvroDecoder in my application and i can deal with deserialization issue.
I believe there is something i am missing or doing incorrectly. Adding exception too :
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic.ongo.test3.user14-0 at offset 220
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 186
Caused by: org.apache.avro.AvroTypeException: Found com.catapult.TestUser, expecting com.catapult.TestUser, missing required field testname
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:176)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:131)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:869)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:775)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:473)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1062)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
Found that there is already an open jira ticket on the same issue : https://issues.apache.org/jira/browse/KAFKA-4740
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With