Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring-Kafka cannot convert AVRO GenericData.Record to Acknowledgment

Using Spring Boot, I am trying to set up my Kafka consumers in batch receiving mode:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter()); // I know this one won't work
    factory.setBatchListener(true);
    return factory;
}

@Bean
public ConsumerFactory<GenericData.Record, GenericData.Record> consumerFactory() {
    Map<String, Object> dataRiverProps = getDataRiverProps();
    dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
    return new DefaultKafkaConsumerFactory<>(dataRiverProps);
}

And this is what the actual consumer looks like:

@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = 'kafkaListenerContainerFactory')
public void consumeAvro(List<GenericData.Record> list, Acknowledgment ack) {
    messageProcessor.addMessageBatchToExecutor(list);
    while (messageProcessor.getTaskSize() > EXECUTOR_TASK_COUNT_THRESHOLD) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            LOGGER_ERROR.error(ExceptionUtils.getStackTrace(e.getCause()));
        }
    }
}

The exceptions I am getting look like this:

nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [org.apache.avro.generic.GenericData$Record] to type [org.springframework.kafka.support.Acknowledgment]
        at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:46)
        at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:191)
        at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:174)
        at org.springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:66)

The Kafka messages are AVRO messages, and I would like to retrieve them as JSON strings. Is there a ready-for-use AVRO converter for GenericData.Record that I can plug in the ConcurrentKafkaListenerContainerFactory? Thanks!

like image 810
Hua Avatar asked Mar 04 '23 13:03

Hua


1 Answers

Just add below property to your kafka consumer configs

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

like image 166
sapan prajapati Avatar answered Apr 06 '23 00:04

sapan prajapati