Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to set kafka consumer concurrency using spring boot

I am writing a Java based Kafka Consumer application. I am utilizing kafka-clients, Spring Kafka and Spring boot for my application. While Spring boot lets me easily write Kafka Consumers (without really writing the ConcurrentKafkaListenerContainerFactory, ConsumerFactory etc), I want to be able to define / customize some of the properties for these consumers. However, I could not find out an easy way to do it using Spring boot. For eg: some of the properties that I would be interested in setting up are -

ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG

I took a look at the Spring Boot pre-defined properties here.

Also, based on an earlier question here, I want to setup the concurrency on the consumers, but cannot find a configuration, application.properties driven way to do that using Spring Boot.

An obvious way is to define the ConcurrentKafkaListenerContainerFactory, ConsumerFactory classes again in my Spring Context and work from there. I wanted to understand if there is a cleaner way of doing that, especially since I am using Spring Boot.

Versions-

  • kafka-clients - 0.10.0.0-SASL
  • spring-kafka - 1.1.0.RELEASE
  • spring boot - 1.5.10.RELEASE
like image 838
user3842182 Avatar asked Jan 02 '23 10:01

user3842182


1 Answers

At the URL you cited, scroll down to

spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.

spring-kafka - 1.1.0.RELEASE

I recommend upgrading to at least 1.3.5; it has a much simpler threading model, thanks to KIP-62.

EDIT

With Boot 2.0, you can set arbitrary producer, consumer, admin, common properties, as described in the boot documentation.

spring.kafka.consumer.properties.heartbeat.interval.ms

With Boot 1.5 there is only spring.kafka.properties as described here.

This sets the properties for both producers and consumers, but you may see some noise in the log about unused/unsupported properties for the producer.

Alternatively, you can simply override Boot's consumer factory and add properties as needed...

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
    Map<String, Object> consumerProps = properties.buildConsumerProperties();
    consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5_000);
    return new DefaultKafkaConsumerFactory<Object, Object>(consumerProps);
}
like image 170
Gary Russell Avatar answered Jan 14 '23 12:01

Gary Russell