I'd like to implement a stateful listener using the Spring Kafka API.
Given the following:
Then "n" KafkaMessageListenerContainers will be created. Each one of these will have its own KafkaConsumer, and hence there will be "n" consumer threads - one per consumer.
When messages are consumed, the @KafkaListener method will be called using the same thread that polled the underlying KafkaConsumer. Since there is only on instance of the listener, this listener needs to be thread safe since there will be concurrent access from "n" threads.
I'd like not to think about concurrent access, and hold state in a listener which I know will only ever be accessed by one thread.
How can you create a separate listener per Kafka consumer using the Spring Kafka API?
You are correct; there is one listener instance per container (regardless of whether is is configured as a @KafkaListener
or MessageListener
).
One work around is to use a prototype scoped MessageListener
with n KafkaMessageListenerContainer
beans (each having 1 thread).
Then, each container will get its own instance of the listener.
That is not possible with the @KafkaListener
POJO abstraction.
It's generally better to use stateless beans, however.
EDIT
I found another work-around using a SimpleThreadScope
...
@SpringBootApplication
public class So51658210Application {
public static void main(String[] args) {
SpringApplication.run(So51658210Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template, ApplicationContext context,
KafkaListenerEndpointRegistry registry) {
return args -> {
template.send("so51658210", 0, "", "foo");
template.send("so51658210", 1, "", "bar");
template.send("so51658210", 2, "", "baz");
template.send("so51658210", 0, "", "foo");
template.send("so51658210", 1, "", "bar");
template.send("so51658210", 2, "", "baz");
};
}
@Bean
public ActualListener actualListener() {
return new ActualListener();
}
@Bean
@Scope("threadScope")
public ThreadScopedListener listener() {
return new ThreadScopedListener();
}
@Bean
public static CustomScopeConfigurer scoper() {
CustomScopeConfigurer configurer = new CustomScopeConfigurer();
configurer.addScope("threadScope", new SimpleThreadScope());
return configurer;
}
@Bean
public NewTopic topic() {
return new NewTopic("so51658210", 3, (short) 1);
}
public static class ActualListener {
@Autowired
private ObjectFactory<ThreadScopedListener> listener;
@KafkaListener(id = "foo", topics = "so51658210")
public void listen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
this.listener.getObject().doListen(in, partition);
}
}
public static class ThreadScopedListener {
private void doListen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(in + ":"
+ Thread.currentThread().getName() + ":"
+ this.hashCode() + ":"
+ partition);
}
}
}
(Container concurrency is 3).
It works fine:
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
The only problem is the scope doesn't clean up on its own (e.g. when the container is stopped and the thread goes away. That may not be critical, depending on your use case.
To fix that, we'd need some help from the container (e.g. publish an event on the listener thread when it is stopped). GH-762.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With