Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get latest offset for a partition for a kafka topic?

I am using the Python high level consumer for Kafka and want to know the latest offsets for each partition of a topic. However I cannot get it to work.

from kafka import TopicPartition from kafka.consumer import KafkaConsumer  con = KafkaConsumer(bootstrap_servers = brokers) ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]  con.assign(ps) for p in ps:     print "For partition %s highwater is %s"%(p.partition,con.highwater(p))  print "Subscription = %s"%con.subscription() print "con.seek_to_beginning() = %s"%con.seek_to_beginning() 

But the output I get is

For partition 0 highwater is None For partition 1 highwater is None For partition 2 highwater is None For partition 3 highwater is None For partition 4 highwater is None For partition 5 highwater is None .... For partition 96 highwater is None For partition 97 highwater is None For partition 98 highwater is None For partition 99 highwater is None Subscription = None con.seek_to_beginning() = None con.seek_to_end() = None 

I have an alternate approach using assign but the result is the same

con = KafkaConsumer(bootstrap_servers = brokers) ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]  con.assign(ps) for p in ps:     print "For partition %s highwater is %s"%(p.partition,con.highwater(p))  print "Subscription = %s"%con.subscription() print "con.seek_to_beginning() = %s"%con.seek_to_beginning() print "con.seek_to_end() = %s"%con.seek_to_end() 

It seems from some of the documentation that I might get this behaviour if a fetch has not been issued. But I cannot find a way to force that. What am I doing wrong?

Or is there a different/simpler way to get the latest offsets for a topic?

like image 713
Saket Avatar asked Feb 16 '16 12:02

Saket


People also ask

How do I get Kafka topic offset?

Short Answer. If your Kafka topic is in Confluent Cloud, use the kafka-console-consumer command with the --partition and --offset flags to read from a specific partition and offset. You can also read messages from a specified partition and offset using the Confluent Cloud Console: Run it.

How do you check last committed offset in Kafka?

To get the last committed offset of a topic partitions you can use the KafkaConsumer. committed(TopicPartition partition) function.

Is Kafka offset per partition?

Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.

What is offset in Kafka partition?

OFFSET IN KAFKA The offset is a unique id assigned to the partitions, which contains messages. The most important use is that it identifies the messages through id, which are available in the partitions. In other words, it is a position within a partition for the next message to be sent to a consumer.


2 Answers

Finally after spending a day on this and several false starts, I was able to find a solution and get it working. Posting it her so that others may refer to it.

from kafka import SimpleClient from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy from kafka.common import OffsetRequestPayload  client = SimpleClient(brokers)  partitions = client.topic_partitions[topic] offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]  offsets_responses = client.send_offset_request(offset_requests)  for r in offsets_responses:     print "partition = %s, offset = %s"%(r.partition, r.offsets[0]) 
like image 179
Saket Avatar answered Sep 21 '22 15:09

Saket


If you wish to use Kafka shell scripts present in kafka/bin, then you can get latest and smallest offsets by using kafka-run-class.sh.

To get latest offset command will look like this

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname 

To get smallest offset command will look like this

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname 

You can find more information on Get Offsets Shell from following link

Hope this helps!

like image 31
avr Avatar answered Sep 18 '22 15:09

avr