Say, I want to check the offset of the first and last message in Kafka for a particular partition. My idea was to use the assign(…)
method along with the seekToBeginning(…)
and seekToEnd(…)
. Unfortunately, this doesn't work.
If I set AUTO_OFFSET_RESET_CONFIG
to "latest"
, the seekToBeginning(…)
has no effect; if I set it to "earliest"
, seekToEnd(…)
doesn't work. It seems that the only thing that matters for my consumer is AUTO_OFFSET_RESET_CONFIG
.
I've seen a similar topic but the problem dealt with the subscribe()
, not with the assign()
method. The proposed solution was to implement the ConsumerRebalanceListner
and pass it as a parameter to the subscribe()
method. Unfortunately, the assign()
method has only one signature and can only take a list of topic partitions.
The question is: Is it possible to use the seekToBeginning()
or seekToEnd()
with the assign()
method. If yes, how? If no, why?
A relevant fragment of my code:
KafkaConsumer<String, ProtoMeasurement> consumer = createConsumer();
TopicPartition zeroP = new TopicPartition(TOPIC, 1);
List<TopicPartition> partitions = Collections.singletonList(zeroP);
consumer.assign(partitions);
consumer.poll(Duration.ofSeconds(1));
consumer.seekToBeginning(partitions);
long currOffsetPos = consumer.position(zeroP);
LOGGER.info("Current offset {}.", currOffsetPos);
ConsumerRecords<String, ProtoMeasurement> records = consumer.poll(Duration.ofMillis(100));
// ...
The logger prints the offset n, which is the biggest (latest) offset of the considered topic.
The seekToEnd method requires the information on the actual partition (in Kafka terms TopicPartition ) on which you plan to make your consumer read from the end.
In such case you need to assign every topic evenly to all consumers. Due to this assignor treating every topic separately, you can expect that all consumers consume from all of the topics. In different assignors, one consumer might end up having all partitions from one topic and no partition from the others.
The offset is a simple integer number that is used by Kafka to maintain the current position of a consumer. That's it. The current offset is a pointer to the last record that Kafka has already sent to a consumer in the most recent poll. So, the consumer doesn't get the same record twice because of the current offset.
I have noticed that this behavior is buggy and inconsistent in the MockConsumer. The docs say that they is lazy, but will trigger after a position() call. But that is not true for the MockConsumer. In particular, I found that it works for MockConsumer between roughly 1.0 and 2.2.2, and is broken after 2.3.0
In its place, I have chosen to do the following instead, which works consistently in the MockConsumer and the real one:
// consistently working seed to beginning
consumer.beginningOffsets(partitions).forEach(consumer::seek);
// consistently working seed to end
consumer.endOffsets(partitions).forEach(consumer::seek);
This is a bit more dangerous if there are threads concurrently calling poll, but it works great in my case, where I just want manual control over the offset position when the application begins polling.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With