Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Filter messages before deserialization based on headers

Sometimes messages can be filtered out before the deserialization based on header values . Are there any existing patterns for this scenario using spring kafka. I am thinking implementing similar to ErrorHandlingDeserializer in addition to delegate take filter predicate also as property. Any suggestions? thanks.

like image 836
kamal Avatar asked Jan 23 '26 18:01

kamal


1 Answers

Yes, you can use the same technique used by the ErrorHandlingDeserializer to return a "marker" object instead of doing the deserialization, then add a RecordFilterStrategy, that filters records with such objects, to the listener (container factory when using @KafkaListener or use a filtering adapter for an explicit listener).

EDIT

Spring Boot and adding a filter...

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        kafkaConsumerFactory.setRecordFilterStrategy(myFilter());
        return factory;
    }
like image 150
Gary Russell Avatar answered Jan 25 '26 17:01

Gary Russell



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!