Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to consume from latest offset using Sarama Go Kafka Consumer

I have three questions:

  1. what is the meaning of "oldest offset"? Oldest offset doesn't mean offset 0?

// OffsetOldest stands for the oldest offset available on the broker for a
// partition.
OffsetOldest int64 = -2

  1. Supposing

    A. Three brokers running on same machine
    B. The consumer group has only one consumer thread
    C. The consumer configs OffsetOldest flag.
    D. There have been 100 msgs produced and currently the consumer thread have consumed 90 msgs.

    So if the consumer thread restarted, then which offset will this consumer start to consume from? it is 91 or 0?

  2. In our code below, it seems to reconsume messages everytime the consumer is started. but actually it doest happen all times. Why reconsuming just happens a few times just after restart(not all)?

     func (this *consumerGroupHandler) ConsumeClaim(session 
     sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
              for message := range claim.Messages() {
              this.handler(message)
             session.MarkMessage(message, "")
        }
    
        return nil
    }
    
    ctx := context.Background()
    conf := sarama.NewConfig()
    
    conf.Version = sarama.V2_0_0_0
    conf.Consumer.Offsets.Initial = sarama.OffsetOldest
    conf.Consumer.Return.Errors = true
    
    consumer, err := sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers, ","), groupId, conf)
    if err != nil {
        logger.Error("NewConsumerGroupFromClient(%s) error: %v", groupId, err)
        return
    }
    
like image 323
Wallace Avatar asked Nov 18 '25 14:11

Wallace


1 Answers

  1. No. When the retention policy is applied, older messages are deleted from the topics. Therefore, the oldest offset might not be the first-ever offset (i.e. 0).

  2. It depends on your configuration. Essentially, you have 3 options:

    • Start consuming from the earliest offset
    • Start consuming from the latest offset
    • Start consuming from a specific offset
  3. You have to use sarama.OffsetOldest. From the documentation,

 const (
        // OffsetNewest stands for the log head offset, i.e. the offset that will be
        // assigned to the next message that will be produced to the partition. You
        // can send this to a client's GetOffset method to get this offset, or when
        // calling ConsumePartition to start consuming new messages.
        OffsetNewest int64 = -1
        // OffsetOldest stands for the oldest offset available on the broker for a
        // partition. You can send this to a client's GetOffset method to get this
        // offset, or when calling ConsumePartition to start consuming from the
        // oldest offset that is still available on the broker.
        OffsetOldest int64 = -2
    )
like image 85
Giorgos Myrianthous Avatar answered Nov 21 '25 21:11

Giorgos Myrianthous



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!