Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka - org.apache.kafka.common.errors.NetworkException

I have a kafka client code which connects to Kafka( Server 0.10.1 and client is 0.10.2) brokers. There are 2 topics with 2 different consumer group in the code and also there is a producer. Getting the NetworkException from the producer code once in a while( once in 2 days, once in 5 days, ...). We see consumer group (Re)joining info in the logs for both the consumer group followed by the NetworkException from the producer future.get() call. Not sure why are we getting this error.

Code :-

final Future<RecordMetadata> futureResponse = 
producer.send(new ProducerRecord<>("ping_topic", "ping"));  
futureResponse.get();

Exception :-

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:70)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:57)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)

Kafka API definition for NetworkException,

"A misc. network-related IOException occurred when making a request. This could be because the client's metadata is out of date and it is making a request to a node that is now dead."

Thanks

like image 228
user1578872 Avatar asked Nov 08 '22 08:11

user1578872


1 Answers

I had the same error while testing the Kafka Consumer. I was using a sender template for it. In the consumer configuration I set additionally the following properties:

 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);

After sending the Message I added a thread sleep:

   ListenableFuture<SendResult<String, String>> future =
    senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);

  Thread.Sleep(10000). 

It was necessary to make the test work, but maybe not suitable for your case.

like image 108
Laura Liparulo Avatar answered Nov 15 '22 10:11

Laura Liparulo