Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Integration and Kafka: How to filter messages based on message header

I have a question which builds on this question: Filter messages before deserialization based on headers

I'd like to filter by kafka consumer record header using Spring Integration DSL.

Currently I have this flow:

@Bean
IntegrationFlow readTicketsFlow(KafkaProperties kafkaProperties,
                                ObjectMapper jacksonObjectMapper,
                                EventService<Ticket> service) {
    Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
    DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);

    return IntegrationFlows.from(
            Kafka.messageDrivenChannelAdapter(
                    consumerFactory, TICKET_TOPIC))
            .transform(fromJson(Ticket.class, new Jackson2JsonObjectMapper(jacksonObjectMapper)))
            .handle(service)
            .get();
}

How can I register org.springframework.kafka.listener.adapter.RecordFilterStrategy in this flow?

like image 736
Patrik Mihalčin Avatar asked Sep 02 '25 04:09

Patrik Mihalčin


1 Answers

You can simply add a .filter() element to the flow.

.filter("!'bar'.equals(headers['foo'])")

Will filter out (ignore) any messages with a header named foo equal to bar.

Note Spring Kafka's RecordFilterStrategy has the reverse sense of Spring Integration filters

public interface RecordFilterStrategy<K, V> {

    /**
     * Return true if the record should be discarded.
     * @param consumerRecord the record.
     * @return true to discard.
     */
    boolean filter(ConsumerRecord<K, V> consumerRecord);

}

Spring Integration filters discard messages if the filter returns false.

EDIT

Or you can add a RecordFilterStrategy to the channel adapter.

return IntegrationFlows
        .from(Kafka.messageDrivenChannelAdapter(consumerFactory(), TEST_TOPIC1)
                .recordFilterStrategy(record -> {
                    Header header = record.headers().lastHeader("foo");
                    return header != null ? new String(header.value()).equals("bar") : false;
                })
                ...

like image 179
Gary Russell Avatar answered Sep 05 '25 00:09

Gary Russell