Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set timeout for onFailure event (Spring, Kafka)?

I'm trying to implement an asynchronous REST method of sending a message to Kafka in Spring MVC. Everything works, but when the server is unavailable, the onFailure event is processed for a long time. How to limit the response time in ListenableFuture for example to three seconds.

Here's my code:

@Autowired
KafkaTemplate<String, String> kafkaTemplate;

@Value("${spring.kafka.topic}")
String topic;

@RequestMapping("/test")
DeferredResult<ResponseEntity<?>> test(
        @RequestParam(value = "message") String message
) {

    DeferredResult<ResponseEntity<?>> deferredResult = new DeferredResult<>();
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, "testKey", message);

    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> sendResult) {
            ResponseEntity<String> responseEntity = new ResponseEntity<>("SUCCESS", HttpStatus.OK);
            deferredResult.setResult(responseEntity);
        }

        @Override
        public void onFailure(Throwable ex) {
            ResponseEntity<String> responseEntity = new ResponseEntity<>("FAILURE", HttpStatus.OK);
            deferredResult.setResult(responseEntity);
        }

    });

    return deferredResult;
}

I tried to use a REQUEST_TIMEOUT_MS_CONFIG property of Kafka and .get(long timeout, TimeUnit unit) method of ListenableFuture but havn't got desired result.

like image 683
V. Perfilev Avatar asked Mar 26 '18 09:03

V. Perfilev


People also ask

How do you handle Kafka timeout?

To handle the timeout exceptions, the general practice is: Rule out broker side issues. make sure that the topic partitions are fully replicated, and the brokers are not overloaded. Fix host name resolution or network connectivity issues if there are any.

What is Kafka request timeout MS?

timeout.ms is the timeout configured on the leader in the Kafka cluster. This is the timeout on the server side. For example if you have set the acks setting to all, the server will not respond until all of its followers have sent a response back to the leader.

How do I change batch size in Kafka?

size measures batch size in total bytes instead of the number of messages. It controls how many bytes of data to collect before sending messages to the Kafka broker. Set this as high as possible, without exceeding available memory. The default value is 16384.

What is Max Poll interval MS Spring Kafka?

Kafka requires one more thing. max.poll.interval.ms (default 5 minutes) defines the maximum time between poll invocations. If it's not met, then the consumer will leave the consumer group.


1 Answers

That's because the producer blocks for 60 seconds (by default).

See max.block.ms in the KafkaDocumentation for producer configuration.

max.block.ms The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.

like image 72
Gary Russell Avatar answered Sep 19 '22 09:09

Gary Russell