Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Counting Number of messages stored in a kafka topic

I'm using 0.9.0.0 version of Kafka and I want to count the number of messages in a topic without using the admin script kafka-console-consumer.sh.

I have tried all the commands in the answer Java, How to get number of messages in a topic in apache kafka but none are yielding the result. Can anyone help me out here?

like image 665
jack AKA karthik Avatar asked Jan 22 '17 15:01

jack AKA karthik


People also ask

How long messages are stored in Kafka?

The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space.

How do you check message size in Kafka?

To know the amount of bytes received by a topic, you can measure this metric on the server side: kafka. server:type=BrokerTopicMetrics,name=BytesInPerSec or checking outgoing-byte-rate metric on the producer side.

How messages are stored in Kafka?

Kafka stores all the messages with the same key into a single partition. Each new message in the partition gets an Id which is one more than the previous Id number. This Id number is also called the Offset. So, the first message is at 'offset' 0, the second message is at offset 1 and so on.

What is Kafka partition count?

maximum 200,000 partitions per Kafka cluster (in total; distributed over many topics) resulting in a maximum of 50 brokers per Kafka cluster.


2 Answers

You could try to execute the command below:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic --time -1 

Then, sum up all the counts for each partition.

Updated: Java implementation

Properties props = new Properties(); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); ...... try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {     consumer.subscribe(Arrays.asList("your_topic"));     Set<TopicPartition> assignment;     while ((assignment = consumer.assignment()).isEmpty()) {         consumer.poll(Duration.ofMillis(100));     }     final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);     final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(assignment);     assert (endOffsets.size() == beginningOffsets.size());     assert (endOffsets.keySet().equals(beginningOffsets.keySet()));      Long totalCount = beginningOffsets.entrySet().stream().mapToLong(entry -> {             TopicPartition tp = entry.getKey();             Long beginningOffset = entry.getValue();             Long endOffset = endOffsets.get(tp);             return endOffset - beginningOffset;         }).sum();     System.out.println(totalCount); } 
like image 170
amethystic Avatar answered Sep 24 '22 15:09

amethystic


Technically speaking you can simply consume all messages from the topic and count them:

Example:

kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic XYZ --partition 0* 

However kafka.tools.GetOffsetShell approach will give you the offsets and not the actual number of messages in the topic. It means if the topic gets compacted you will get two differed numbers if you count messages by consuming them or by reading offsets.

Topic compaction: https://kafka.apache.org/documentation.html#design_compactionbasics

like image 40
Cosmos Avatar answered Sep 24 '22 15:09

Cosmos