Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to filter Kafka messages before consumer consume in spring Kafka

I am using spring Kafka in my project and I want to filter messages before consumer consumes based on key and value.

Is it possible?

like image 651
app Avatar asked Jul 25 '18 04:07

app


2 Answers

Yes in spring Kafka you can filter messages before consumer consumes, there is an interface public interface RecordFilterStrategy<K,V> and method in interface boolean filter(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> consumerRecord)

so you need to override this filter method and if it returns false consumer will consume the message, and if it return true message will not consume

You can apply this filtration on message as well as message values

consumerRecord.key() // will return key of message
consumerRecord.value() // will return the message

Example code:

 @Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(Integer.parseInt(threads));
    factory.setBatchListener(true);
    factory.setConsumerFactory(kafkaConsumerFactory());
    factory.getContainerProperties().setPollTimeout(Long.parseLong(pollTimeout));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);

    if(true) {
        factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {

            @Override
            public boolean filter(ConsumerRecord<String, String> consumerRecord) {
                if(consumerRecord.key().equals("ETEST")) {
                return false;
                }
            else {
                return true;
                 }
            }   
        });
    }

    return factory;
}
like image 65
Deadpool Avatar answered Sep 21 '22 16:09

Deadpool


adding to @Deadpool comments. It will work fine but it will not commit offset. so we will keep getting same message again but it will not consume. we need to set factory.setAckDiscarded(true); before setting factory.setRecordFilterStrategy() so that it will discard and commit offset.

like image 28
Sreenivasa D Avatar answered Sep 18 '22 16:09

Sreenivasa D