Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring @KafkaListener execute and poll records after certain interval

We wanted to consume the records after a certain interval (e.g. every 5 minutes). Consumer properties are standard:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(1);
    factory.setBatchListener(true);
    factory.getContainerProperties().setPollTimeout(300000);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    return factory;
}

Even though when i change the property setPollTimeout it doesnot poll after defined interval (5 minutes), it continuously polls after 30 seconds, here are my logs:

2018-01-23 18:07:26.875 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 2

2018-01-23 18:07:56.901 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 4

We were trying to build a kafka stream application with windowed aggregations and planning to consume the window x after y interval.

I can see that in the class: KafkaMessageListenerContainer, setConsumerTaskExecutor is set:

if (containerProperties.getConsumerTaskExecutor() == null) {
        SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                (getBeanName() == null ? "" : getBeanName()) + "-C-");
        containerProperties.setConsumerTaskExecutor(consumerExecutor);
    }

But how do we configure when this (frequency) thread pool polls records. Any help appreciated.

like image 839
Sandeep B Avatar asked Jan 23 '18 12:01

Sandeep B


2 Answers

You cannot control the rate at which the consumer polls, the pollTimeout is how long the poll() will wait for new records to arrive. If new records arrive more often, it will not wait that long.

If you wish to control the rate at which you receive records, simply use the DefaultKafkaConsumerFactory to create a consumer and poll it whenever you want.

You can't use that with a @KafkaListener though - you have to deal with the record yourself.

like image 114
Gary Russell Avatar answered Sep 27 '22 20:09

Gary Russell


This feature was introduced in 2.3 version.

Starting with version 2.3, the ContainerProperties provides an idleBetweenPolls option to let the main loop in the listener container to sleep between KafkaConsumer.poll() calls. An actual sleep interval is selected as the minimum from the provided option and difference between the max.poll.interval.ms consumer config and the current records batch processing time.

https://docs.spring.io/spring-kafka/reference/html/

KafkaListenerConfig.java

package br.com.sicredi.spi.icom.consumer.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaListenerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        factory.getContainerProperties().setIdleBetweenPolls(100); // 100 miliseconds
        
        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    private Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        // ... 
        return props;
    }
}
like image 44
Gabriel C. Stabel Avatar answered Sep 27 '22 20:09

Gabriel C. Stabel