Goal: read all messages from the topic then terminate the process.
I am able to continuously read the messages with the following:
props.put("bootstrap.servers", kafkaBootstrapSrv);
props.put("group.id", group_id);
props.put("max.poll.records", 1); // Only get one record at a time. I understand that to read all messages this will need to be increased
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("MY_TOPIC"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
for (ConsumerRecord<String, String> record : records) {
process_record(record);
}
consumer.commitSync();
}
But in this case the process never terminates. When I get rid of the
while (true)
loop and run the program, it does not pick up a record from the topic (I would expect one record). Why is that?
A Kafka topics basically materializes an infinite stream of events.
So when to stop when consuming from a topic? How do you know you reached the end? The short answer is you don't! In theory a producer could always send a new message to the topic.
In practice there are a few things you can do to stop at the end assuming no/few new records are being appended.
Using endOffsets()
you can find the current last offset of partitions. Once the consumer has reached that offsets for all partitions it's assigned to, you can stop polling (or refresh it and see if new messages have been sent).
You can retrieve the current position in each partition by using the position()
method. When consumed, each record also exposes its own offset via offset()
. So you can use these to track the progress towards the end offsets.
Regarding your second question about poll()
returning nothing the first time it's called. That's expected as basically poll()
makes the client do work and on the 1st call, it will initiate a connection to the cluster and start the group protocol (that takes a few seconds), so it's unlikely messages will already be received before poll()
returns.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With