Using Spring Boot, I am trying to set up my Kafka consumers in batch receiving mode:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter()); // I know this one won't work
factory.setBatchListener(true);
return factory;
}
@Bean
public ConsumerFactory<GenericData.Record, GenericData.Record> consumerFactory() {
Map<String, Object> dataRiverProps = getDataRiverProps();
dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
return new DefaultKafkaConsumerFactory<>(dataRiverProps);
}
And this is what the actual consumer looks like:
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = 'kafkaListenerContainerFactory')
public void consumeAvro(List<GenericData.Record> list, Acknowledgment ack) {
messageProcessor.addMessageBatchToExecutor(list);
while (messageProcessor.getTaskSize() > EXECUTOR_TASK_COUNT_THRESHOLD) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER_ERROR.error(ExceptionUtils.getStackTrace(e.getCause()));
}
}
}
The exceptions I am getting look like this:
nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [org.apache.avro.generic.GenericData$Record] to type [org.springframework.kafka.support.Acknowledgment]
at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:46)
at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:191)
at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:174)
at org.springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:66)
The Kafka messages are AVRO messages, and I would like to retrieve them as JSON strings. Is there a ready-for-use AVRO converter for GenericData.Record that I can plug in the ConcurrentKafkaListenerContainerFactory? Thanks!
Just add below property to your kafka consumer configs
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With