Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I handle IOException when Kafka is down?

I am trying to publish message, and Apache Kafka is down. How should I handle an emergency situation like this?

KafkaProducer::send() method doesn't throw any exceptions that could be handled. Producer swallows them and logs errors instead, so I get flooded with messages like this, and everything hangs until Kafka gets connected again.

2014-03-31 09:38:23.752 ERROR o.a.kafka.common.network.Selector - Error in I/O: 
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_51]
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_51]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:205) ~[kafka-clients-0.8.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212) [kafka-clients-0.8.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:150) [kafka-clients-0.8.1.jar:na]
    at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
like image 594
Grzegorz Piwowarek Avatar asked Mar 31 '14 07:03

Grzegorz Piwowarek


People also ask

How do you handle Kafka failure?

You can deal with failed transient sends in several ways: Drop failed messages. Exert backpressure further up the application and retry sends. Send all messages to alternative local storage, from which they will be ingested into Kafka asynchronously.

How do you handle Kafka timeout?

To handle the timeout exceptions, the general practice is: Rule out broker side issues. make sure that the topic partitions are fully replicated, and the brokers are not overloaded. Fix host name resolution or network connectivity issues if there are any.

What happens when Kafka consumer throws exception?

When an exception is thrown while consuming message number 2. Then messages 3 to 9 are skipped. And the next message to be processed is 10 (the first message in the next poll loop)


1 Answers

Call get() on the future returned by Producer.send(...), or if you don't want to block your code, pass a callback.

try { producer.send(new ProducerRecord("mytopic", key, value)) .get(); // block until acknowledged } catch(Exception e) { // handle message wasn't acknowledged }

like image 107
Peter Davis Avatar answered Sep 17 '22 18:09

Peter Davis