Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to programmatically get latest offset per Kafka topic partition in Python

I'm new to Kafka, and want to get the position of a Kafka topic, per partition. I see in the documentation - https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html#kafkaadminclient - that the offset is available via the function KafkaAdminClient.list_consumer_group_offsets, but I don't see such a method for the position there.

Does anybody know how I can get it?

like image 303
Chedva Avatar asked Oct 21 '25 23:10

Chedva


1 Answers

Using confluent-kafka-python

You can use position:

Retrieve current positions (offsets) for the list of partitions.

from confluent_kafka import Consumer, TopicPartition


consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())] 

offset_per_partition = consumer.position(partitions)

Alternatively, you can also use get_watermark_offsets but you'd have to pass one partition at a time and thus it requires multiple calls:

Retrieve low and high offsets for partition.

from confluent_kafka import Consumer, TopicPartition


consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())] 

for p in partitions:
    low_offset, high_offset = consumer.get_watermark_offsets(p)
    print(f"Latest offset for partition {f}: {high_offset}")

Using kafka-python

You can use end_offsets:

Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

This method does not change the current consumer position of the partitions.

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer


consumer = KafkaConsumer(bootstrap_servers = "localhost:9092" )
partitions= = [TopicPartition('myTopic', p) for p in consumer.partitions_for_topic('myTopic')]
last_offset_per_partition = consumer.end_offsets(partitions)

If you want to iterate through all topics, the following will do the trick:

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer


kafka_topics = consumer.topics()
for topic in kafka_topics: 
    partitions= = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
    last_offset_per_partition = consumer.end_offsets(partitions)
like image 180
Giorgos Myrianthous Avatar answered Oct 24 '25 13:10

Giorgos Myrianthous