Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

For how long does a kafka producer stay alive between messages?

I am opening a kafka producer with config properties -

KafkaProducer<String, MyValue>   producer = new KafkaProducer<String, MyValue>(kafkaProperties);

then sending records synchronously using - (so as to avoid batching and also maintain the original message order)

   //create myValue instance //omited for simplicity
   //create myrecord instance using topicname and myvalue
   producer.send(myRecord).get();
   producer.flush();  //send message as soon as record is available to producer

now my issue is, I have several records to send and between sends i might have to wait for long times - few minutes to hours (for what ever reasons, atleast to explore and understand kafka better).

I want to know for how long will the producer connection with the cluster/bootstrap server be alive. Is there anyway i can configure it using the producer configurations.
(In depth explanations will be greatly thanked - even if it has to go to tcp connection levels, you are welcome)

(kafka consumers have a heartbeat concept. Does producers have similar concept. A google search for "kafka producer heartbeat.interval.ms" returned only result for consumer).

like image 750
joven Avatar asked Jan 01 '26 11:01

joven


1 Answers

KafkaProducer.send method is asynchronous, by default it adds all records into buffer memory and send them at once, so according docs the producer establish the connection while sending the batch to cluster

The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by the batch.size config. Making this larger can result in more batching, but requires more memory (since we will generally have one of these buffers for each active partition).

By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you want to reduce the number of requests you can set linger.ms to something greater than 0.

This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP.

For example, in the code snippet above, likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer.

Note that records that arrive close together in time will generally batch together even with linger.ms=0 so under heavy load batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more efficient requests when not under maximal load at the cost of a small amount of latency.

From the KafkaProducer.flush, invoking flush doesn't mean producer send each record to cluster, invoking flush makes all buffered records immediately available to send

Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records. The post-condition of flush() is that any previously sent record will have completed (e.g. Future.isDone() == true). A request is considered completed when it is successfully acknowledged according to the acks configuration you have specified or else it results in an error.

like image 129
Deadpool Avatar answered Jan 03 '26 22:01

Deadpool



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!