Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Consuming again messages from kafka log compaction topic

I have a spring application with a Kafka consumer using a @KafkaListerner annotation. The topic being consumed is log compacted and we might have the scenario where we must consume again the topic messages. What's the best way to achieve this programmatically? We don't control the Kafka topic configuration.

like image 231
cardosojc Avatar asked Oct 23 '25 02:10

cardosojc


1 Answers

    @KafkaListener(...)
    public void listen(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        System.out.println(in);
        if (this.resetNeeded) {
            consumer.seekToBeginning(consumer.assignment());
            this.resetNeeded = false;
        }
    }

If you want to reset when the listener is idle (no records) you can enable idle events and perform the seeks by listening for a ListenerContainerIdleEvent in an ApplicationListener or @EventListener method.

The event has a reference to the consumer.

EDIT

@SpringBootApplication
public class So58769796Application {

    public static void main(String[] args) {
        SpringApplication.run(So58769796Application.class, args);
    }

    @KafkaListener(id = "so58769796", topics = "so58769796")
    public void listen1(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println("One:" + key + ":" + value);
    }

    @KafkaListener(id = "so58769796a", topics = "so58769796")
    public void listen2(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println("Two:" + key + ":" + value);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so58769796")
                .compact()
                .partitions(1)
                .replicas(1)
                .build();
    }

    boolean reset;

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so58769796", "foo", "bar");
            System.out.println("Hit enter to rewind");
            System.in.read();
            this.reset = true;
        };
    }

    @EventListener
    public void listen(ListenerContainerIdleEvent event) {
        System.out.println(event);
        if (this.reset && event.getListenerId().startsWith("so58769796-")) {
            event.getConsumer().seekToBeginning(event.getConsumer().assignment());
        }
    }

}

and

spring.kafka.listener.idle-event-interval=5000

EDIT2

Here's another technique - in this case we rewind each time the app starts (and on demand)...

@SpringBootApplication
public class So58769796Application implements ConsumerSeekAware {

    public static void main(String[] args) {
        SpringApplication.run(So58769796Application.class, args);
    }

    @KafkaListener(id = "so58769796", topics = "so58769796")
    public void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println(key + ":" + value);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so58769796")
                .compact()
                .partitions(1)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template,
            KafkaListenerEndpointRegistry registry) {

        return args -> {
            template.send("so58769796", "foo", "bar");
            System.out.println("Hit enter to rewind");
            System.in.read();
            registry.getListenerContainer("so58769796").stop();
            registry.getListenerContainer("so58769796").start();
        };

    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}
like image 84
Gary Russell Avatar answered Oct 26 '25 00:10

Gary Russell



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!