Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement a stateful message listener using Spring Kafka?

I'd like to implement a stateful listener using the Spring Kafka API.

Given the following:

  • A ConcurrentKafkaListenerContainerFactory, with concurrency set to "n"
  • A @KafkaListener annotated method, on a Spring @Service class

Then "n" KafkaMessageListenerContainers will be created. Each one of these will have its own KafkaConsumer, and hence there will be "n" consumer threads - one per consumer.

When messages are consumed, the @KafkaListener method will be called using the same thread that polled the underlying KafkaConsumer. Since there is only on instance of the listener, this listener needs to be thread safe since there will be concurrent access from "n" threads.

I'd like not to think about concurrent access, and hold state in a listener which I know will only ever be accessed by one thread.

How can you create a separate listener per Kafka consumer using the Spring Kafka API?

like image 292
A_M Avatar asked Oct 16 '22 15:10

A_M


1 Answers

You are correct; there is one listener instance per container (regardless of whether is is configured as a @KafkaListener or MessageListener).

One work around is to use a prototype scoped MessageListener with n KafkaMessageListenerContainer beans (each having 1 thread).

Then, each container will get its own instance of the listener.

That is not possible with the @KafkaListener POJO abstraction.

It's generally better to use stateless beans, however.

EDIT

I found another work-around using a SimpleThreadScope...

@SpringBootApplication
public class So51658210Application {

    public static void main(String[] args) {
        SpringApplication.run(So51658210Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, ApplicationContext context,
            KafkaListenerEndpointRegistry registry) {
        return args -> {
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
        };
    }

    @Bean
    public ActualListener actualListener() {
        return new ActualListener();
    }

    @Bean
    @Scope("threadScope")
    public ThreadScopedListener listener() {
        return new ThreadScopedListener();
    }

    @Bean
    public static CustomScopeConfigurer scoper() {
        CustomScopeConfigurer configurer = new CustomScopeConfigurer();
        configurer.addScope("threadScope", new SimpleThreadScope());
        return configurer;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so51658210", 3, (short) 1);
    }

    public static class ActualListener {

        @Autowired
        private ObjectFactory<ThreadScopedListener> listener;

        @KafkaListener(id = "foo", topics = "so51658210")
        public void listen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            this.listener.getObject().doListen(in, partition);
        }

    }

    public static class ThreadScopedListener {

        private void doListen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            System.out.println(in + ":"
                    + Thread.currentThread().getName() + ":"
                    + this.hashCode() + ":"
                    + partition);
        }

    }

}

(Container concurrency is 3).

It works fine:

bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2

The only problem is the scope doesn't clean up on its own (e.g. when the container is stopped and the thread goes away. That may not be critical, depending on your use case.

To fix that, we'd need some help from the container (e.g. publish an event on the listener thread when it is stopped). GH-762.

like image 101
Gary Russell Avatar answered Oct 29 '22 02:10

Gary Russell