I'm trying to implement basic scenario, re-read topic from beginning (at least 1 message), and I'm facing unexpected behavior.
Suppose, there are 1 partition topic holding exactly 1 million messages, 1 consumer with offset already committed somewhere at middle, no active producers.
First I've tried
consumer.subscribe(Collections.singletonList(topic));
consumer.seekToBeginning(Collections.emptySet());
consumer.poll(Duration.ofMillis(longTimeout)); //no loop to simplify
And that doesn't work (no messages polled). I've read that seekToBeginning is lazy (and that's ok), but it turns out, seekToBeginning doesn't impact at all, cause it need partitions to be already assigned, what will only happen with first poll. Should it be described at docs, or have I missed it?
Then I've tried
consumer.subscribe(Collections.singletonList(topic));
consumer.poll(Duration.ofMillis(assignTimeout));
consumer.seekToBeginning(Collections.emptySet());
consumer.poll(Duration.ofMillis(longTimeout));//no loop to simplify
And turns out, it depends on assignTimeout. It should be enough to complete join process. That time may vary and it's not possible to rely on it.
Then I've provided ConsumerRebalanceListener with
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
And single poll left. And it finally seems to work.
So the questions are:
seekToBeginning right after subscribe useless? Should it be documented?ConsumerRebalanceListener reliable? Does it guarantee that no messages from middle (committed offset) would be polled before seek will apply?For the first one:
You've rightly mentioned this in your question that the pre-requisite for seek() or seekToXXXX() operations is that the partitions need to be assigned. This will not happen until we join a consumer group and this will happen only if we call poll(). So, seek() operation not working immediately after the subscribe() is the expected behaviour.
This is actually documented in Kafka's Definitive Guide, Chapter 4 Kafka Consumers, Section - Consuming Records with Specific Offsets.
For the second question:
Yes, using ConsumerRebalanceListener is reliable and is a recommended approach as per Kafka's Definitive Guide.
Here's the statement from the same chapter that confirms the same:
There are many different ways to implement exactly-once semantics ..................., but all of them will need to use the ConsumerRebalance Listener and seek() to make sure offsets are stored in time and that the consumer starts reading messages from the correct location.
Hope this helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With