Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

No exception is coming while sending message when kafka is down

I have scenario where kafka is getting down randomly. When i send message to kafka and it is down i want to handle it separately. But When i send message to kafka while it is down, no exception is coming and so i am not able to identify whether kafka is down or not. Is there any way i can catch exception and the lost message.

I have a very basic code

public static void main(String[] args) {
    String topicName = "test-1";

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<>(topicName, Integer.toString(i), Integer.toString(i)));
    }
    System.out.println("Message sent successfully");
    producer.close();
}
like image 896
Deepank Porwal Avatar asked Feb 01 '19 07:02

Deepank Porwal


People also ask

What happens if Kafka is down?

If Kafka is unavailable to send messages to, then no external activity has taken place. For these systems, a Kafka outage might mean that you do not accept new transactions. In such a case, it may be reasonable to return an error message and allow the external third party to retry later.

What happens if Kafka producer fails?

Unable to reach Kafka clusterThe producer may fail to push message to a topic due to a network partition or unavailability of the Kafka cluster, in such cases there are high chances of messages being lost, hence we need a retry mechanism to avoid loss of data.

What happens when Kafka consumer throws exception?

When an exception is thrown while consuming message number 2. Then messages 3 to 9 are skipped. And the next message to be processed is 10 (the first message in the next poll loop)

What happens if Kafka topic is full?

cleanup. policy property from topic config which by default is delete , says that "The delete policy will discard old segments when their retention time or size limit has been reached." So, if you send record with producer api and topic got full, it will discard old segments.


1 Answers

You need to send a synchronous call in order to get a response back from Kafka.

for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record = 
              new ProducerRecord<>(topicName, Integer.toString(i), Integer.toString(i));
    try {
           producer.send(record).get();
    } catch (Exception e) {
           e.printStackTrace();
           // Handle message that has failed
    }
}

Method .get() will return the response from Kafka. It will throw an exception when the record has failed to be pushed to Kafka. When the record has been successfully sent, then a RecordMetadata will be returned.

like image 102
Giorgos Myrianthous Avatar answered Sep 22 '22 03:09

Giorgos Myrianthous