I'm writing a kafka
consumer using Java. I want to keep the real time of the message, so if there are too many messages waiting for consuming, such as 1000 or more, I should abandon the unconsumed messages and start consuming from the last offset.
For this problem, I try to compare the last committed offset and the end offset of a topic(only 1 partition), if the difference between these two offsets is larger than a certain amount, I will set the last committed offset of the topic as next offset so that I can abandon those redundant messages.
Now my problem is how to get the end offset of a topic, some people say I can use old consumer, but it's too complicated, do new consumer has this function?
Using kafka-python You can use end_offsets : Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1. This method does not change the current consumer position of the partitions.
To get the last committed offset of a topic partitions you can use the KafkaConsumer. committed(TopicPartition partition) function.
Short Answer. If your Kafka topic is in Confluent Cloud, use the kafka-console-consumer command with the --partition and --offset flags to read from a specific partition and offset. You can also read messages from a specified partition and offset using the Confluent Cloud Console: Run it.
It commits the offset, indicating that all the previous records from that partition have been processed. So, if a consumer stops and comes back later, it restarts from the last committed position (if assigned to that partition again). Note that this behavior is configurable.
The new consumer is also complicated.
//assign the topic consumer.assign();
//seek to end of the topic consumer.seekToEnd();
//the position is the latest offset consumer.position();
You can also use the kafka server command line tools:
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topic-name
Output is of the form <topicName>:<partitionID>:<offset>
, e.g. t1:0:0
, see https://jaceklaskowski.gitbooks.io/apache-kafka/kafka-tools-GetOffsetShell.html for further details.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With