Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why don't Kafka's seekToBeginning and seekToEnd work with assign?

Say, I want to check the offset of the first and last message in Kafka for a particular partition. My idea was to use the assign(…) method along with the seekToBeginning(…) and seekToEnd(…). Unfortunately, this doesn't work.

If I set AUTO_OFFSET_RESET_CONFIG to "latest", the seekToBeginning(…) has no effect; if I set it to "earliest", seekToEnd(…) doesn't work. It seems that the only thing that matters for my consumer is AUTO_OFFSET_RESET_CONFIG.

I've seen a similar topic but the problem dealt with the subscribe(), not with the assign() method. The proposed solution was to implement the ConsumerRebalanceListner and pass it as a parameter to the subscribe() method. Unfortunately, the assign() method has only one signature and can only take a list of topic partitions.

The question is: Is it possible to use the seekToBeginning() or seekToEnd() with the assign() method. If yes, how? If no, why?

A relevant fragment of my code:

KafkaConsumer<String, ProtoMeasurement> consumer = createConsumer();
TopicPartition zeroP = new TopicPartition(TOPIC, 1);
List<TopicPartition> partitions = Collections.singletonList(zeroP);

consumer.assign(partitions);
consumer.poll(Duration.ofSeconds(1));
consumer.seekToBeginning(partitions);
long currOffsetPos = consumer.position(zeroP);
LOGGER.info("Current offset {}.", currOffsetPos);
ConsumerRecords<String, ProtoMeasurement> records = consumer.poll(Duration.ofMillis(100));
// ...

The logger prints the offset n, which is the biggest (latest) offset of the considered topic.

like image 436
RobertSzooba Avatar asked Oct 25 '19 10:10

RobertSzooba


People also ask

What is seekToEnd in Kafka?

The seekToEnd method requires the information on the actual partition (in Kafka terms TopicPartition ) on which you plan to make your consumer read from the end.

How do I assign a partition to a consumer?

In such case you need to assign every topic evenly to all consumers. Due to this assignor treating every topic separately, you can expect that all consumers consume from all of the topics. In different assignors, one consumer might end up having all partitions from one topic and no partition from the others.

What is Kafka offset?

The offset is a simple integer number that is used by Kafka to maintain the current position of a consumer. That's it. The current offset is a pointer to the last record that Kafka has already sent to a consumer in the most recent poll. So, the consumer doesn't get the same record twice because of the current offset.


1 Answers

I have noticed that this behavior is buggy and inconsistent in the MockConsumer. The docs say that they is lazy, but will trigger after a position() call. But that is not true for the MockConsumer. In particular, I found that it works for MockConsumer between roughly 1.0 and 2.2.2, and is broken after 2.3.0

In its place, I have chosen to do the following instead, which works consistently in the MockConsumer and the real one:

// consistently working seed to beginning
consumer.beginningOffsets(partitions).forEach(consumer::seek);
// consistently working seed to end
consumer.endOffsets(partitions).forEach(consumer::seek);

This is a bit more dangerous if there are threads concurrently calling poll, but it works great in my case, where I just want manual control over the offset position when the application begins polling.

like image 77
Scott Carey Avatar answered Nov 14 '22 23:11

Scott Carey