I am writing a Java based Kafka Consumer application. I am utilizing kafka-clients, Spring Kafka and Spring boot for my application. While Spring boot lets me easily write Kafka Consumers (without really writing the ConcurrentKafkaListenerContainerFactory, ConsumerFactory etc), I want to be able to define / customize some of the properties for these consumers. However, I could not find out an easy way to do it using Spring boot. For eg: some of the properties that I would be interested in setting up are -
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
I took a look at the Spring Boot pre-defined properties here.
Also, based on an earlier question here, I want to setup the concurrency on the consumers, but cannot find a configuration, application.properties driven way to do that using Spring Boot.
An obvious way is to define the ConcurrentKafkaListenerContainerFactory, ConsumerFactory
classes again in my Spring Context and work from there. I wanted to understand if there is a cleaner way of doing that, especially since I am using Spring Boot.
Versions-
At the URL you cited, scroll down to
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
spring-kafka - 1.1.0.RELEASE
I recommend upgrading to at least 1.3.5; it has a much simpler threading model, thanks to KIP-62.
EDIT
With Boot 2.0, you can set arbitrary producer, consumer, admin, common properties, as described in the boot documentation.
spring.kafka.consumer.properties.heartbeat.interval.ms
With Boot 1.5 there is only spring.kafka.properties
as described here.
This sets the properties for both producers and consumers, but you may see some noise in the log about unused/unsupported properties for the producer.
Alternatively, you can simply override Boot's consumer factory and add properties as needed...
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> consumerProps = properties.buildConsumerProperties();
consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5_000);
return new DefaultKafkaConsumerFactory<Object, Object>(consumerProps);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With