I have multi thread app which uses producer class to produce messages, earlier i was using below code to create producer for each request.where KafkaProducer was newly built with each request as below:
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);
ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
isValidMsg[0] = false;
exception.printStackTrace();
saveOrUpdateLog(msgBean, producerType, exception);
logger.error("ERROR:Unable to produce message.",exception);
}
}
});
producer.close();
Then I read Kafka docs on producer and come to know we should use single producer instance to have good performance.
Then I created single instance of KafkaProducer inside a singleton class.
Now when & where we should close the producer. Obviously if we close the producer after first send request it wont find the producer to resend messages hence throwing :
java.lang.IllegalStateException: Cannot send after the producer is closed.
OR how we can reconnect to producer once closed. Problem is if program crashes or have exceptions then?
Generally, calling close()
on the KafkaProducer
is sufficient to make sure all inflight records have completed:
/**
* Close this producer. This method blocks until all previously sent requests complete.
* This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
* <p>
* <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
* will be called instead. We do this because the sender thread would otherwise try to join itself and
* block forever.</strong>
* <p>
*
* @throws InterruptException If the thread is interrupted while blocked
*/
If your producer is being used throughout the lifetime of your application, don't close it up until you get a termination signal, then call close()
. As said in the documentation, the producer is safe to used in a multi-threaded environment and hence you should re-use the same instance.
If you're sharing your KafkaProducer
in multiple threads, you have two choices:
close()
while registering a shutdown callback via Runtime.getRuntime().addShutdownHook
from your main execution threadA rough sketch of 2 would possibly look like this:
object KafkaOwner {
private var producer: KafkaProducer = ???
@volatile private var isClosed = false
def close(): Unit = {
if (!isClosed) {
kafkaProducer.close()
isClosed = true
}
}
def instance: KafkaProducer = {
this.synchronized {
if (!isClosed) producer
else {
producer = new KafkaProducer()
isClosed = false
}
}
}
}
As described in javadoc for KafkaProducer
:
public void close()
Close this producer. This method blocks until all previously sent requests complete.
This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS).
src: https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close()
So you don't need to worry that your messages won't be sent, even if you call close immediately after send.
If you plan to use a KafkaProducer more than once, then close it only after you've finished using it. If you still want to have the guarantee that your message is actually sent before your method completes and not waiting in a buffer, then use KafkaProducer#flush()
which will block until current buffer is sent. You can also block on Future#get()
if you prefer.
There is also one caveat to be aware of if you don't plan to ever close your KafkaProducer (e.g. in short-lived apps, where you just send some data and the app immediately terminates after sending). The KafkaProducer IO thread is a daemon thread, which means the JVM will not wait until this thread finishes to terminate the VM. So, to ensure that your messages are actually sent use KafkaProducer#flush()
, no-arg KafkaProducer#close()
or block on Future#get()
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With