Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading the same message several times from Kafka

I use Spring Kafka API to implement Kafka consumer with manual offset management:

@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
    if (someCondition) {
        acknowledgment.acknowledge();
    }
}

Here, I want the consumer to commit the offset only if someCondition holds. Otherwise the consumer should sleep for some time and read the same message again.

Kafka Configuration:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
    factory.getContainerProperties().setAckMode(MANUAL);
    return factory;
}

private Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    ...
    return props;
}

With the current configuration, if someCondition == false, consumer doesn't commit the offset, but still reads the next messages. Is there a way to make the consumer reread a message if the Kafka acknowledgement wasn't performed?

like image 306
Aliaxander Avatar asked Dec 14 '22 03:12

Aliaxander


2 Answers

As @Gary already pointed out, you are in the correct direction, seek() is the way to do it. I couldn't find code example of it today, when I faced this problem. Here is the code for anyone who wants to solve the problem.

public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {

    private ConsumerSeekCallback consumerSeekCallback;


    @Override
    public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {

        if (/*some condition*/) {
            //process
            acknowledgment.acknowledge(); //send ack
        } else {

            consumerSeekCallback.seek("your.topic", record.partition(), record.offset());

        }
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
        this.consumerSeekCallback = consumerSeekCallback;
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

}
like image 58
Arjit Avatar answered Dec 25 '22 08:12

Arjit


You can stop and restart the container and it will be re-sent.

With the upcoming 1.1 release, you can seek to the required offset and it will be resent.

But you will still see later messages first if they have already been retrieved so you will have to discard those too.

The second milestone has that feature and we expect it to be released next week.

like image 22
Gary Russell Avatar answered Dec 25 '22 10:12

Gary Russell