Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I gracefully handle a Kafka outage?

I am connecting to Kafka using the 0.8.2.1 kafka-clients library. I am able to successfully connect when Kafka is up, but I want to handle failure gracefully when Kafka is down. Here is my configuration:

kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
kafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
producer = new KafkaProducer(kafkaProperties);

When Kafka is down, I get the following error in my logs:

WARN: 07 Apr 2015 14:09:49.230 org.apache.kafka.common.network.Selector:276 - [] Error in I/O with localhost/127.0.0.1
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_75]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_75]
at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]

This error repeats in an infinite loop and locks up my Java application. I have tried various configuration settings related to timeouts, retries, and acknowledgements, but I have been unable to prevent this loop from occurring.

Is there a configuration setting that can prevent this? Do I need to try a different version of the client? How can a Kafka outage be handled gracefully?

like image 248
David Hansen Avatar asked Apr 07 '15 20:04

David Hansen


People also ask

How do you handle failed messages in Kafka?

Processing failed messages can be achieved by cloning the message and republishing it to one of retry topics with updated information about attempt number and next retry timestamp. Consumers of retry topics should block the thread unless it is time to process the message.

What happens when Kafka consumer goes down?

If the consumer crashes or is shut down, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition. If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.

What happens when Kafka broker fails?

If any broker fails, data should not be lost. For fault-tolerance purposes, the partition is replicated and stored in different brokers. If leader brokers fail, then the controller will elects one of the replicas as the leader.


1 Answers

I figured out that this combination of settings allows the kafka client to fail quickly without holding the thread or spamming the logs:

kafkaProperties.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "300");
kafkaProperties.setProperty(ProducerConfig.TIMEOUT_CONFIG, "300");
kafkaProperties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "10000");
kafkaProperties.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "10000");

I dislike that the kafka client holds the thread while trying to connect to the kafka server, rather than being fully async, but this at least is functional.

like image 174
David Hansen Avatar answered Sep 20 '22 07:09

David Hansen