While polling Kafka, I have subscribed to multiple topics using the subscribe()
function. Now, I want to set the offset from which I want to read from each topic, without resubscribing after every seek()
and poll()
from a topic. Will calling seek()
iteratively over each of the topic names, before polling for data achieve the result? How are the offsets exactly stored in Kafka?
I have one partition per topic and just one consumer to read from all topics.
Offsets in Kafka are stored as messages in a separate topic named '__consumer_offsets' . Each consumer commits a message into the topic at periodic intervals.
each consumer group maintains its offset per topic partition. if you need multiple subscribers, then you have multiple consumer groups. a record gets delivered to only one consumer in a consumer group. each consumer in a consumer group processes records and only one consumer in that group will get the same record.
Kafka consumers can commit an offset to a partition. If the offset is committed successfully, after the consumer restarts, it can continue consuming from the committed offset. Kafka's offset is continuous as it follows the following constraints: The first message's offset is 0.
As Kafka adds each record to a partition, it assigns a unique sequential ID called an offset.
How does Kafka store offsets for each topic?
Kafka has moved the offset storage from zookeeper to kafka brokers. The reason is below:
Zookeeper is not a good way to service a high-write load such as offset updates because zookeeper routes each write though every node and hence has no ability to partition or otherwise scale writes. We have always known this, but chose this implementation as a kind of "marriage of convenience" since we already depended on zk.
Kafka store the offset commits in a topic, when consumer commit the offset, kafka publish an commit offset message to an "commit-log" topic and keep an in-memory structure that mapped group/topic/partition to the latest offset for fast retrieval. More design infomation could be found in this page about offset management.
Now, I want to set the offset from which I want to read from each topic, without resubscribing after every seek() and poll() from a topic.
There is a new feature about kafka admin tools to reset offset.
kafka-consumer-group.sh --bootstrap-server 127.0.0.1:9092 --group your-consumer-group **--reset-offsets** --to-offset 1 --all-topics --execute
There are more options you can use.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With