I am connecting to Kafka using the 0.8.2.1 kafka-clients library. I am able to successfully connect when Kafka is up, but I want to handle failure gracefully when Kafka is down. Here is my configuration:
kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
kafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
producer = new KafkaProducer(kafkaProperties);
When Kafka is down, I get the following error in my logs:
WARN: 07 Apr 2015 14:09:49.230 org.apache.kafka.common.network.Selector:276 - [] Error in I/O with localhost/127.0.0.1
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_75]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_75]
at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
This error repeats in an infinite loop and locks up my Java application. I have tried various configuration settings related to timeouts, retries, and acknowledgements, but I have been unable to prevent this loop from occurring.
Is there a configuration setting that can prevent this? Do I need to try a different version of the client? How can a Kafka outage be handled gracefully?
Processing failed messages can be achieved by cloning the message and republishing it to one of retry topics with updated information about attempt number and next retry timestamp. Consumers of retry topics should block the thread unless it is time to process the message.
If the consumer crashes or is shut down, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition. If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.
If any broker fails, data should not be lost. For fault-tolerance purposes, the partition is replicated and stored in different brokers. If leader brokers fail, then the controller will elects one of the replicas as the leader.
I figured out that this combination of settings allows the kafka client to fail quickly without holding the thread or spamming the logs:
kafkaProperties.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "300");
kafkaProperties.setProperty(ProducerConfig.TIMEOUT_CONFIG, "300");
kafkaProperties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "10000");
kafkaProperties.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "10000");
I dislike that the kafka client holds the thread while trying to connect to the kafka server, rather than being fully async, but this at least is functional.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With