I have a kafka producer defined like
public KafkaMessageProducer(String kafkaHost, String kafkaPort, Map<String, String> map) {
this.kafkaTopic = map;
final Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", kafkaHost + ":" + kafkaPort);
producer = new KafkaProducer<String, String>(properties);
}
And I am sending message using following code. (tried using callback also).
public void sendMessage(String topic, RestCommonResource resultToken) {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.valueToTree(resultToken);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, jsonNode.toString());
producer.send(record);
}
But if kafka server is down and producer publishes a message, program gets stuck in infinite loop with following exception:
WARN [2018-09-13 06:27:59,589] org.apache.kafka.common.network.Selector: Error in I/O with localhost/127.0.0.1
! java.net.ConnectException: Connection refused: no further information
! at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_80]
! at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) ~[na:1.7.0_80]
! at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na]
! at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]
If there any property that can be set to stop this retrying and drop the message.
In order to disable retries in the STCEH, use a FixedBackOff with 0 retries. The default BackOff is 9 retries (10 attempts) with no back off.
Send the message to a dedicated DLQ Kafka topic if any exception occurs. The failure cause should be added to the header of the Kafka message. The key and value should not be changed so that future re-processing and failure analysis of historical events are straightforward.
One of the quickest ways to find out if there are active brokers is by using Zookeeper's dump command. The dump command is one of the 4LW commands available to administer a Zookeeper server. On executing the command, we see a list of ephemeral broker ids registered with the Zookeeper server.
By default, Kafka provides at-least-once delivery guarantees. Users may implement at-most-once and exactly once semantics by customizing offset location recording. In any case, Kafka offers you the option for choosing which delivery semantics you strive to achieve.
Currently if a Kafka client loses a connection with brokers it will wait for reconnect.backoff.ms milliseconds before attempting to reconnect.
While this strategy works well when a client is disconnected for a short time if a single broker or the entire cluster become unavailable for a long time all clients will quickly generate a lot of connections.
In addition, developers have limited control over a client which constantly loses its connections with the cluster.
I think this topic useful for you: Add custom policies for reconnect attempts to NetworkdClient
reconnect.backoff.ms : The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.
reconnect.backoff.max.ms : The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With