Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ConcurrentMessageListenerContainer isn't acting concurrent

I'm trying to create a multi-threaded listener, but all of the messages execute in the same thread. When run, the thread ID is always the same, even thought the KafkaListerContainerFactory is (correctly) the one that I instantiated. If I send 7 messages nearly simultaneously, I would expect the first three to process concurrently, then the next three concurrently, and then the last one. What I see is that the first processes to completion, then the second, then the third, etc. Am I misunderstanding something, or just misconfiguring?

This is my listener:

@Component
public class ExampleKafkaController {
    Log log = Log.getLog(ExampleKafkaController.class);

    @Autowired
    private KafkaListenerContainerFactory kafkaListenerContainerFactory;

    @KafkaListener(topics = "${kafka.example.topic}")
    public void listenForMessage(ConsumerRecord<?, ?> record) {
        log.info("Got record:\n" + record.value());
        System.out.println("Kafka Thread: " + Thread.currentThread());
        System.out.println(kafkaListenerContainerFactory);

        log.info("Waiting...");
        waitSleep(10000);

        log.info("Done!");
    }

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.example.topic}")
    public String topic;    

    public void send(String payload) {
        log.info("sending payload='" + payload + "' to topic='" + topic + "'");
        kafkaTemplate.send(topic, payload);
    }

    private void waitSleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

And this is my application with the new config:

@SpringBootApplication
@ComponentScan("net.reigrut.internet.services.example.*")
@EntityScan("net.reigrut.internet.services.example.*")
@EnableKafka
@Configuration
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Autowired 
    ConsumerFactory<Integer,String> consumerFactory;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(3);
        System.out.println("===========>" + consumerFactory);
        System.out.println(factory);
        return factory;
    }
}
like image 218
cmreigrut Avatar asked Jan 21 '26 15:01

cmreigrut


1 Answers

With Kafka, concurrency is limited to the number of partitions in the topic. If there is only one partition, you will only receive messages on one thread, regardless of the container's concurrency setting.

You should provision the number of partitions to be greater than or equal to the concurrency you desire. If the number of partitions is greater than the concurrency, the partitions will be distributed across the consumer threads.

Only one consumer in a group can consume from a partition.

like image 170
Gary Russell Avatar answered Jan 23 '26 10:01

Gary Russell



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!