I'm trying to implement an asynchronous REST method of sending a message to Kafka in Spring MVC. Everything works, but when the server is unavailable, the onFailure event is processed for a long time. How to limit the response time in ListenableFuture for example to three seconds.
Here's my code:
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Value("${spring.kafka.topic}")
String topic;
@RequestMapping("/test")
DeferredResult<ResponseEntity<?>> test(
@RequestParam(value = "message") String message
) {
DeferredResult<ResponseEntity<?>> deferredResult = new DeferredResult<>();
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, "testKey", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> sendResult) {
ResponseEntity<String> responseEntity = new ResponseEntity<>("SUCCESS", HttpStatus.OK);
deferredResult.setResult(responseEntity);
}
@Override
public void onFailure(Throwable ex) {
ResponseEntity<String> responseEntity = new ResponseEntity<>("FAILURE", HttpStatus.OK);
deferredResult.setResult(responseEntity);
}
});
return deferredResult;
}
I tried to use a REQUEST_TIMEOUT_MS_CONFIG
property of Kafka and .get(long timeout, TimeUnit unit)
method of ListenableFuture but havn't got desired result.
To handle the timeout exceptions, the general practice is: Rule out broker side issues. make sure that the topic partitions are fully replicated, and the brokers are not overloaded. Fix host name resolution or network connectivity issues if there are any.
timeout.ms is the timeout configured on the leader in the Kafka cluster. This is the timeout on the server side. For example if you have set the acks setting to all, the server will not respond until all of its followers have sent a response back to the leader.
size measures batch size in total bytes instead of the number of messages. It controls how many bytes of data to collect before sending messages to the Kafka broker. Set this as high as possible, without exceeding available memory. The default value is 16384.
Kafka requires one more thing. max.poll.interval.ms (default 5 minutes) defines the maximum time between poll invocations. If it's not met, then the consumer will leave the consumer group.
That's because the producer blocks for 60 seconds (by default).
See max.block.ms
in the KafkaDocumentation for producer configuration.
max.block.ms
The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With