I have three questions:
// OffsetOldest stands for the oldest offset available on the broker for a
// partition.
OffsetOldest int64 = -2
Supposing
A. Three brokers running on same machine
B. The consumer group has only one consumer thread
C. The consumer configs OffsetOldest flag.
D. There have been 100 msgs produced and currently the consumer thread have consumed 90 msgs.
So if the consumer thread restarted, then which offset will this consumer start to consume from? it is 91 or 0?
In our code below, it seems to reconsume messages everytime the consumer is started. but actually it doest happen all times. Why reconsuming just happens a few times just after restart(not all)?
func (this *consumerGroupHandler) ConsumeClaim(session
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
this.handler(message)
session.MarkMessage(message, "")
}
return nil
}
ctx := context.Background()
conf := sarama.NewConfig()
conf.Version = sarama.V2_0_0_0
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
conf.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers, ","), groupId, conf)
if err != nil {
logger.Error("NewConsumerGroupFromClient(%s) error: %v", groupId, err)
return
}
No. When the retention policy is applied, older messages are deleted from the topics. Therefore, the oldest offset might not be the first-ever offset (i.e. 0).
It depends on your configuration. Essentially, you have 3 options:
earliest offsetlatest offsetYou have to use sarama.OffsetOldest. From the documentation,
const ( // OffsetNewest stands for the log head offset, i.e. the offset that will be // assigned to the next message that will be produced to the partition. You // can send this to a client's GetOffset method to get this offset, or when // calling ConsumePartition to start consuming new messages. OffsetNewest int64 = -1 // OffsetOldest stands for the oldest offset available on the broker for a // partition. You can send this to a client's GetOffset method to get this // offset, or when calling ConsumePartition to start consuming from the // oldest offset that is still available on the broker. OffsetOldest int64 = -2 )
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With