I'm using 0.9.0.0 version of Kafka and I want to count the number of messages in a topic without using the admin script kafka-console-consumer.sh.
I have tried all the commands in the answer Java, How to get number of messages in a topic in apache kafka but none are yielding the result. Can anyone help me out here?
The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space.
To know the amount of bytes received by a topic, you can measure this metric on the server side: kafka. server:type=BrokerTopicMetrics,name=BytesInPerSec or checking outgoing-byte-rate metric on the producer side.
Kafka stores all the messages with the same key into a single partition. Each new message in the partition gets an Id which is one more than the previous Id number. This Id number is also called the Offset. So, the first message is at 'offset' 0, the second message is at offset 1 and so on.
maximum 200,000 partitions per Kafka cluster (in total; distributed over many topics) resulting in a maximum of 50 brokers per Kafka cluster.
You could try to execute the command below:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic --time -1
Then, sum up all the counts for each partition.
Updated: Java implementation
Properties props = new Properties(); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); ...... try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList("your_topic")); Set<TopicPartition> assignment; while ((assignment = consumer.assignment()).isEmpty()) { consumer.poll(Duration.ofMillis(100)); } final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment); final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(assignment); assert (endOffsets.size() == beginningOffsets.size()); assert (endOffsets.keySet().equals(beginningOffsets.keySet())); Long totalCount = beginningOffsets.entrySet().stream().mapToLong(entry -> { TopicPartition tp = entry.getKey(); Long beginningOffset = entry.getValue(); Long endOffset = endOffsets.get(tp); return endOffset - beginningOffset; }).sum(); System.out.println(totalCount); }
Technically speaking you can simply consume all messages from the topic and count them:
Example:
kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic XYZ --partition 0*
However kafka.tools.GetOffsetShell
approach will give you the offsets and not the actual number of messages in the topic. It means if the topic gets compacted you will get two differed numbers if you count messages by consuming them or by reading offsets.
Topic compaction: https://kafka.apache.org/documentation.html#design_compactionbasics
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With