Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Consumer WakeupException Handling Java

Tags:

if I run the kafka consumer inside a thread without manipulating it from outside, like put the consumer on sleep or wake him up, it is necessary to handle the WakeupException properly? And what is a good approach to handle it?

The consumer is running on an webservice for pulling constantly data out of a queue and should never stop doing it. Furthermore the service has no idle or suspend state. In the documentation of Kafka it is pointed out that the exception is only thrown when the kafka consumer is blocked by another thread, but that will never happen. https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/errors/WakeupException.html

Kafka Version 0.10.0.0

catch (WakeupException e) {
    LOG.info("Kafka Consulmer wakeup exception");
    // Ignore exception if closing
    if (!closed.get()) {
        throw e;
    }
} finally {
    consumer.close();
}

Regards, Rakesh

like image 231
Rakesh Kachhadiya Avatar asked Jun 09 '16 13:06

Rakesh Kachhadiya


People also ask

What is WakeupException in Kafka?

Class WakeupException Exception used to indicate preemption of a blocking operation by an external thread. For example, KafkaConsumer. wakeup() can be used to break out of an active KafkaConsumer.

How do you consume new messages from Kafka topic in Java?

If you want to consume only the latest message from kafka topic, Please set “Auto Offset Reset” to “LATEST” and keep other values as default. If you want to consume all the message published from kafka topic Please set “Auto Offset Reset” to “EARLIEST” and keep other values as default.

How can I make Kafka consumer faster?

Increasing the number of partitions and the number of brokers in a cluster will lead to increased parallelism of message consumption, which in turn improves the throughput of a Kafka cluster; however, the time required to replicate data across replica sets will also increase.

What happens when Kafka consumer throws exception?

When an exception is thrown while consuming message number 2. Then messages 3 to 9 are skipped. And the next message to be processed is 10 (the first message in the next poll loop)


1 Answers

You can find few examples on the Confluent document which show how to properly handle the WakeupException [Confluent Consumer Doc]

Basically, if you use consumer.poll(Integer.MAX_VALUE) the consumer will block until a message is fetched. In this case, if you would like to stop consumption you can call consumer.wakeup() (from an other thread) and catch the exception to shutdown properly.

Also, while commiting your offsets synchronously a call to consumer.wakeup() will throw a WakeupException .

like image 63
fhussonnois Avatar answered Sep 28 '22 02:09

fhussonnois