Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to configure an UncaughtExceptionHandler in Spring Kafka Stream

I am using spring-kafka to implement a stream application using Spring Boot 1.5.16. The version of spring-kafka we are using is the 1.3.8.RELEASE.

I am searching for a method to close the boot application in case of an error that terminates all the threads associated withKafka Streams. I found that inside the KafkaStreams there is the possibility to register a handle for uncaught exceptions. The method is setGlobalStateRestoreListener.

I saw that this method is exposed inside spring-kafka in the type KStreamBuilderFactoryBean.

My question is the following. Is there a simple way to register the UncaughtExceptionHandler as a bean and let Spring to properly inject inside the factory bean? Or should I create the KStreamBuilderFactoryBean my own and set manually the handler?

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean kStreamBuilderFactoryBean(StreamsConfig streamsConfig) {
    final KStreamBuilderFactoryBean streamBuilderFactoryBean = new KStreamBuilderFactoryBean(
            streamsConfig);
    streamBuilderFactoryBean.setUncaughtExceptionHandler((threadInError, exception) -> {
        // Something happens here
    });
    return streamBuilderFactoryBean;
}

Thanks a lot.

like image 545
riccardo.cardin Avatar asked Dec 04 '25 15:12

riccardo.cardin


1 Answers

Yes. In that old version you have to specify a KStreamBuilderFactoryBean bean by yourself with an appropriate injections and exactly that KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME.

In the later versions we have already a StreamsBuilderFactoryBeanConfigurer to still keep that auto-configured KStreamBuilderFactoryBean, but be able to modify it whatever way we need.

UPDATE

You just create it as a bean in your application context and the framework will pick it up to apply on the StreamsBuilderFactoryBean:

    @Bean
    StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
        return new StreamsBuilderFactoryBeanConfigurer() {

            @Override
            public void configure(StreamsBuilderFactoryBean factoryBean) {
                factoryBean.setCloseTimeout(...);
            }

            @Override
            public int getOrder() {
                return Integer.MAX_VALUE;
            }

        };
    }
like image 152
Artem Bilan Avatar answered Dec 07 '25 03:12

Artem Bilan



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!