if I run the kafka
consumer inside a thread without manipulating it from outside, like put the consumer on sleep or wake him up, it is necessary to handle the WakeupException
properly? And what is a good approach to handle it?
The consumer is running on an webservice for pulling constantly data out of a queue and should never stop doing it. Furthermore the service has no idle or suspend state. In the documentation of Kafka it is pointed out that the exception is only thrown when the kafka consumer is blocked by another thread, but that will never happen. https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/errors/WakeupException.html
Kafka Version 0.10.0.0
catch (WakeupException e) {
LOG.info("Kafka Consulmer wakeup exception");
// Ignore exception if closing
if (!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
Regards, Rakesh
Class WakeupException Exception used to indicate preemption of a blocking operation by an external thread. For example, KafkaConsumer. wakeup() can be used to break out of an active KafkaConsumer.
If you want to consume only the latest message from kafka topic, Please set “Auto Offset Reset” to “LATEST” and keep other values as default. If you want to consume all the message published from kafka topic Please set “Auto Offset Reset” to “EARLIEST” and keep other values as default.
Increasing the number of partitions and the number of brokers in a cluster will lead to increased parallelism of message consumption, which in turn improves the throughput of a Kafka cluster; however, the time required to replicate data across replica sets will also increase.
When an exception is thrown while consuming message number 2. Then messages 3 to 9 are skipped. And the next message to be processed is 10 (the first message in the next poll loop)
You can find few examples on the Confluent document which show how to properly handle the WakeupException [Confluent Consumer Doc]
Basically, if you use consumer.poll(Integer.MAX_VALUE)
the consumer will block until a message is fetched. In this case, if you would like to stop consumption you can call consumer.wakeup()
(from an other thread) and catch the exception to shutdown properly.
Also, while commiting your offsets synchronously a call to consumer.wakeup()
will throw a WakeupException
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With