I am using spring Kafka in my project and I want to filter messages before consumer consumes based on key and value.
Is it possible?
Yes in spring Kafka you can filter messages before consumer consumes, there is an interface public interface RecordFilterStrategy<K,V>
and method in interface boolean filter(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> consumerRecord)
so you need to override this filter
method and if it returns false consumer will consume the message, and if it return true message will not consume
You can apply this filtration on message as well as message values
consumerRecord.key() // will return key of message
consumerRecord.value() // will return the message
Example code:
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(Integer.parseInt(threads));
factory.setBatchListener(true);
factory.setConsumerFactory(kafkaConsumerFactory());
factory.getContainerProperties().setPollTimeout(Long.parseLong(pollTimeout));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
if(true) {
factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {
if(consumerRecord.key().equals("ETEST")) {
return false;
}
else {
return true;
}
}
});
}
return factory;
}
adding to @Deadpool comments. It will work fine but it will not commit offset. so we will keep getting same message again but it will not consume. we need to set factory.setAckDiscarded(true);
before setting factory.setRecordFilterStrategy()
so that it will discard and commit offset.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With