Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get the latest offset value from a confluent_python AVRO consumer

Tags:

I am pretty new to confluent_kafka but I've gained some experience with kafka-python. What I am trying to do is changing the offset where to start consuming messages. This why I'd like to build a consumer client able to move back to previous messages in order to return data that will populate a dashboard. Said that using the kafka-python package I can use the seek_to_end (https://github.com/dpkp/kafka-python/blob/c0fddbd24269d4333e3b6630a23e86ffe33dfcb6/kafka/consumer/group.py#L788) method in order to get the position value of the latest commit. Having that I can subtract values and get back to the previous messages using the seek method (https://github.com/dpkp/kafka-python/blob/c0fddbd24269d4333e3b6630a23e86ffe33dfcb6/kafka/consumer/group.py#L738)

On the other hand, the conflient_kafka seems to have no similar functions and what I've found so far is to use the variables OFFSET_END which has a value of -1 and it doesn't return me the offset numeric value of the latest and largest one. I can use the 'seek' function as well, but I need a way to have the numeric value of the latest offset and not -1.

My avro consumer looks like

from confluent_kafka.avro import AvroConsumer

if __name__ == '__main__':
     c = AvroConsumer({"bootstrap.servers": "locahost:29092", "group.id":"mygroup",'schema.registry.url': 'http://localhost:8081',
                  'enable.auto.commit': True,'default.topic.config': {'auto.offset.reset': 'smallest'}})

def my_assign (consumer, partitions):
    for p in partitions:
        p.offset = confluent_kafka.OFFSET_END
        print("offset=",p.offset)
    print('assign', partitions)
    print('position:',consumer.position(partitions))
    consumer.assign(partitions)

c.subscribe(["mytopic"],on_assign=my_assign)

while True:
    m = c.poll(1)
    if m is None:
        continue

    if m.error() is None:
        print('Received message', m.value(),m.offset())
c.close()

which produces the following result:

offset= -1
assign [TopicPartition{topic=mytopic,partition=0,offset=-1,error=None}]
position: [TopicPartition{topic=mytopic,partition=0,offset=-1001,error=None}]

and stays waiting for the next message. I was wondering if someone can help me out. Thanks

like image 304
hellbreak Avatar asked Apr 22 '18 16:04

hellbreak


1 Answers

You can use Consumer.get_watermark_offsets (see docs)

Example:

cfg = {
    # ... ...
    "group.id": str(uuid4())
}
consumer = AvroConsumer(cfg)
topic_partition = TopicPartition("topic-name", partition=123)
low, high = consumer.get_watermark_offsets(topic_partition)
print("the latest offset is {}".format(high))
like image 133
Ming Avatar answered Oct 11 '22 16:10

Ming