I need to prevent the kafka consumer from timing out while the application waits for a particular process to complete. My approach is to pause the partitions and then resume them once the process is completed.
List<TopicPartition> partitionList = new ArrayList<>();
partitionList.addAll(kafkaConsumer.assignment());
kafkaConsumer.pause(partitionList);
while(//waiting for the process to complete){
Thread.sleep(10000);
kafkaConsumer.poll(0);
}
kafkaConsumer.resume(partitionList);
Questions
Does pause send heartbeat to kafka automatically or should I still need to poll at regular intervals to send the heart beat?
Is mine the best approach ? or is there a better way of doing it?
Since Kafka 0.10.1, consumers do have a background thread for sending heartbeats: https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
Thus, you don't need to call poll()
to send heartbeat to the brokers. However, there is a second timeout max.poll.interval.ms
-- you must call poll()
within this time to avoid this second timeout. Default value is 5 minutes. You can just increase this timeout if your wait is even longer than this. If you do so, you also don't need to pause any partitions etc.
If you are using an older version, you approach of pausing, and calling poll()
regularly is the only way to send regular heartbeat to avoid the timeout.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With