Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka - Deserializing the object in Consumer

We are considering to use Kafka in our for messaging and our applications are developed using Spring. So, we have planned to use spring-kafka.

The producer puts the message as HashMap object into the queue. We have JSON serializer and we assumed that the map will be serialized and put into the queue. And here is the producer config.

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
        key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

On the other hand, we have a listener which listens to the same topic where the producer has published the message. Here is the consumer config:

spring:
   kafka:
       consumer:
            group-id: xyz
            key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
            value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

Our listener method:

  public void listener(SomeClass abx)

We were expecting the json will be de-serialized and an object of type "SomeClass" will be generated. But apparently, it throws de-serialization exception.

We saw few articles and the suggestion was to do something like:

 @Bean
  public ConsumerFactory<String, Car> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
        new JsonDeserializer<>(Car.class));
  }

We don't want to write some code for creating the Deserializer. Is there any boilerplate thing which we are missing? Any help will be appreciated!!

like image 563
Thiru Avatar asked Jun 29 '18 07:06

Thiru


People also ask

How do you serialize an object in Kafka?

In order to serialize our own objects, we'll implement the Serializer interface. Similarly, to create a custom deserializer, we'll implement the Deserializer interface. There are there methods available to override for both interfaces: configure: used to implement configuration details.

Why is serialization necessary in Kafka?

Serialization is important for Apache Kafka® because as mentioned above, a Kafka broker only works with bytes. Kafka stores records in bytes, and when a fetch request comes in from a consumer, Kafka returns records in bytes.

Can your Kafka consumers handle a poison pill?

The consumer is not able to handle the poison pill. The consumption of the topic partition is blocked because the consumer offset is not moving forward. The consumer will try again and again (very rapidly) to deserialize the record but will never succeed.


1 Answers

See the boot documentation. In particular:

You can also configure the Spring Kafka JsonDeserializer as follows:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice

spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

like image 98
Gary Russell Avatar answered Oct 26 '22 23:10

Gary Russell