I am using the Python high level consumer for Kafka and want to know the latest offsets for each partition of a topic. However I cannot get it to work.
from kafka import TopicPartition from kafka.consumer import KafkaConsumer con = KafkaConsumer(bootstrap_servers = brokers) ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] con.assign(ps) for p in ps: print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) print "Subscription = %s"%con.subscription() print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
But the output I get is
For partition 0 highwater is None For partition 1 highwater is None For partition 2 highwater is None For partition 3 highwater is None For partition 4 highwater is None For partition 5 highwater is None .... For partition 96 highwater is None For partition 97 highwater is None For partition 98 highwater is None For partition 99 highwater is None Subscription = None con.seek_to_beginning() = None con.seek_to_end() = None
I have an alternate approach using assign
but the result is the same
con = KafkaConsumer(bootstrap_servers = brokers) ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] con.assign(ps) for p in ps: print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) print "Subscription = %s"%con.subscription() print "con.seek_to_beginning() = %s"%con.seek_to_beginning() print "con.seek_to_end() = %s"%con.seek_to_end()
It seems from some of the documentation that I might get this behaviour if a fetch
has not been issued. But I cannot find a way to force that. What am I doing wrong?
Or is there a different/simpler way to get the latest offsets for a topic?
Short Answer. If your Kafka topic is in Confluent Cloud, use the kafka-console-consumer command with the --partition and --offset flags to read from a specific partition and offset. You can also read messages from a specified partition and offset using the Confluent Cloud Console: Run it.
To get the last committed offset of a topic partitions you can use the KafkaConsumer. committed(TopicPartition partition) function.
Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.
OFFSET IN KAFKA The offset is a unique id assigned to the partitions, which contains messages. The most important use is that it identifies the messages through id, which are available in the partitions. In other words, it is a position within a partition for the next message to be sent to a consumer.
Finally after spending a day on this and several false starts, I was able to find a solution and get it working. Posting it her so that others may refer to it.
from kafka import SimpleClient from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy from kafka.common import OffsetRequestPayload client = SimpleClient(brokers) partitions = client.topic_partitions[topic] offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()] offsets_responses = client.send_offset_request(offset_requests) for r in offsets_responses: print "partition = %s, offset = %s"%(r.partition, r.offsets[0])
If you wish to use Kafka shell scripts present in kafka/bin, then you can get latest and smallest offsets by using kafka-run-class.sh.
To get latest offset command will look like this
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname
To get smallest offset command will look like this
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname
You can find more information on Get Offsets Shell from following link
Hope this helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With