Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Producer TimeOutException

I am running a Samza stream job that is writing data to Kafka topic. Kafka is running a 3 node cluster. Samza job is deployed on yarn. We are seeing lot of these exceptions in container logs :

 INFO [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.container.ContainerHeartbeatMonitor:[ContainerHeartbeatMonitor:stop:61] - [main] - Stopping ContainerHeartbeatMonitor
ERROR [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.runtime.LocalContainerRunner:[LocalContainerRunner:run:107] - [main] - Container stopped with Exception. Exiting process now.
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:147)
        at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:694)
        at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:104)
        at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:149)
Caused by: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:181)
        at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
        at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for Topic3-16 due to 30332 ms has passed since last attempt plus backoff time

These 3 types of exceptions are coming a lot.

59088 org.apache.kafka.common.errors.TimeoutException: Expiring 115 record(s) for Topic3-1 due to 30028 ms has passed since last attempt plus backoff time

61015 org.apache.kafka.common.errors.TimeoutException: Expiring 60 record(s) for Topic3-1 due to 74949 ms has passed since batch creation plus linger time

62275 org.apache.kafka.common.errors.TimeoutException: Expiring 176 record(s) for Topic3-4 due to 74917 ms has passed since last append

Please help me understand what is the issue here. Whenever its happened Samza container is getting restarted.

like image 318
Anuj jain Avatar asked Nov 09 '18 09:11

Anuj jain


People also ask

How do you handle Org Apache Kafka common errors Timeoutexception?

To handle the timeout exceptions, the general practice is: Rule out broker side issues. make sure that the topic partitions are fully replicated, and the brokers are not overloaded. Fix host name resolution or network connectivity issues if there are any.

What is Kafka producer batch size?

The default is 16KB . Increasing a batch size to 32KB or 64KB can help increase the compression, throughput, and efficiency of requests. Any message that is bigger than the batch size will not be batched.

Can a Kafka producer write to multiple partitions?

Just like that, Apache Kafka Topic has two ends, Producers and Consumers. A Producer creates messages, and sends them in one of the Partitions of a Topic. A Consumer on the other hand reads the messages from the Partitions of a Topic. Multiple Partitions or channels are created to increase redundancy.


1 Answers

The error indicates that some records are put into the queue at a faster rate than they can be sent from the client.

When your Producer sends messages, they are stored in buffer (before sending the to the target broker) and the records are grouped together into batches in order to increase throughput. When a new record is added to the batch, it must be sent within a -configurable- time window which is controlled by request.timeout.ms (the default is set to 30 seconds). If the batch is in the queue for longer time, a TimeoutException is thrown and the batch records will then be removed from the queue and won't be delivered to the broker.

Increasing the value of request.timeout.ms should do the trick for you.

In case this does not work, you can also try decreasing batch.size so that batches are sent more often (but this time will include fewer messages) and make sure that linger.ms is set to 0 (which is the default value).

Note that you need to restart your kafka brokers after changing any configuration parameter.

If you still get the error I assume that something wrong is going on with your network. Have you enabled SSL?

like image 174
Giorgos Myrianthous Avatar answered Nov 15 '22 10:11

Giorgos Myrianthous