Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka - stop retrying on ConnectException

I have a kafka producer defined like

public KafkaMessageProducer(String kafkaHost, String kafkaPort, Map<String, String> map) {
        this.kafkaTopic = map;
        final Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", kafkaHost + ":" + kafkaPort);
        producer = new KafkaProducer<String, String>(properties);
    }

And I am sending message using following code. (tried using callback also).

public void sendMessage(String topic, RestCommonResource resultToken) {

        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode  jsonNode = objectMapper.valueToTree(resultToken);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, jsonNode.toString());
        producer.send(record);

    }

But if kafka server is down and producer publishes a message, program gets stuck in infinite loop with following exception:

WARN  [2018-09-13 06:27:59,589] org.apache.kafka.common.network.Selector: Error in I/O with localhost/127.0.0.1
! java.net.ConnectException: Connection refused: no further information
! at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_80]
! at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) ~[na:1.7.0_80]
! at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na]
! at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]

If there any property that can be set to stop this retrying and drop the message.

like image 201
codingenious Avatar asked Sep 13 '18 06:09

codingenious


People also ask

How do I stop Kafka Retry?

In order to disable retries in the STCEH, use a FixedBackOff with 0 retries. The default BackOff is 9 retries (10 attempts) with no back off.

How do you handle failed messages in Kafka?

Send the message to a dedicated DLQ Kafka topic if any exception occurs. The failure cause should be added to the header of the Kafka message. The key and value should not be changed so that future re-processing and failure analysis of historical events are straightforward.

How do I know if Kafka broker is running or not?

One of the quickest ways to find out if there are active brokers is by using Zookeeper's dump command. The dump command is one of the 4LW commands available to administer a Zookeeper server. On executing the command, we see a list of ephemeral broker ids registered with the Zookeeper server.

Does Kafka support guaranteed delivery?

By default, Kafka provides at-least-once delivery guarantees. Users may implement at-most-once and exactly once semantics by customizing offset location recording. In any case, Kafka offers you the option for choosing which delivery semantics you strive to achieve.


1 Answers

Currently if a Kafka client loses a connection with brokers it will wait for reconnect.backoff.ms milliseconds before attempting to reconnect.

While this strategy works well when a client is disconnected for a short time if a single broker or the entire cluster become unavailable for a long time all clients will quickly generate a lot of connections.

In addition, developers have limited control over a client which constantly loses its connections with the cluster.

I think this topic useful for you: Add custom policies for reconnect attempts to NetworkdClient

reconnect.backoff.ms : The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.

reconnect.backoff.max.ms : The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.

like image 188
Alihossein shahabi Avatar answered Sep 24 '22 02:09

Alihossein shahabi