Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring kafka Consume multiple Message types in one consumer

I have multiple producers that can send multiple type of event to one kafka topic.

And I have a consumer that must consume all type of message. with different logic for every type of message.

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(Message<EventOne> event) {
    logger.info("event={}", event);
}

But in this case all messages come to this method not only EventOne

If I implement two method (for every type of message) then all messages come to only one method.

If i implement listener like this:

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(Message<?> event) {
    logger.info("event={}", event);
}

then I get exception: org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-listener-1] ERROR org.springframework.kafka.listener.LoggingErrorHandler - Error while processing: ConsumerRecord java.lang.IllegalArgumentException: Unrecognized Type: [null]

Please tell how I can implement multiple type consumer?

like image 774
rminko Avatar asked Mar 11 '23 09:03

rminko


1 Answers

I found the soultion and it is very simple. So, it is not clear from the question, but I use jsonMessageConverter like this:

    @Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}

The Jackson library by default do not adds class information for JSON therefore it can't guess what type of i want to deserialize. Solution is the annotation

@JsonTypeInfo(use= JsonTypeInfo.Id.CLASS, include= JsonTypeInfo.As.PROPERTY, property="class")

that adds classinformation to json string. On the consumer side I just write the base class of my event

    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(BasicEvent event) {

    if (event instanceof EventOne) {
        logger.info("type={EventOne}, event={}", event);
    } else {
        logger.info("type={EventTwo}, event={}", event);
    }
}

Hope this info helps someone.

like image 179
rminko Avatar answered Mar 24 '23 18:03

rminko