Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Kafka asynchronous send calls block

I'm using Spring-Kafka version 1.2.1 and, when the Kafka server is down/unreachable, the asynchronous send calls block for a time. It seems to be the TCP timeout. The code is something like this:

ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
    @Override
    public void onSuccess(SendResult<K, V> result) {
        ...
    }

    @Override
    public void onFailure(Throwable ex) {
        ...
    }
});

I've taken a really quick look at the Spring-Kafka code and it seems to just pass the task along to the kafka client library, translating a callback interaction to a future object interaction. Looking at the kafka client library, the code gets more complex and I didn't take the time to understand it all, but I guess it may be making remote calls (metadata, at least?) in the same thread.

As a user, I expected the Spring-Kafka methods that return a future to return immediately, even if the remote kafka server is unreachable.

Any confirmation if my understanding is wrong or if this is a bug would be welcome. I ended up making it asynchronous on my end for now.

Another problem is that Spring-Kafka documentation says, at the beginning, that it provides synchronous and asynchronous send methods. I couldn't find any methods that do not return futures, maybe the documentation needs updating.

I'm happy to provide any further details if needed. Thanks.

like image 467
Carlos E. L. Augusto Avatar asked Jul 13 '17 15:07

Carlos E. L. Augusto


People also ask

Is Kafka send asynchronous?

Kafka is a powerful stream processing tool, but it's an asynchronous tool. So we know when we send the request but we don't know when the answer will come. And in some cases, there are some synchronous applications which fronts Kafka.

Does Kafka support asynchronous?

Apache Kafka: Asynchronous Messaging for Seamless Systems.

Is KafkaTemplate send synchronous?

The KafkaTemplate wraps a producer and provides convenience methods to send data to kafka topics. Both asynchronous and synchronous methods are provided, with the async methods returning a Future .

Is KafkaListener single threaded?

The KafkaMessageListenerContainer receives all message from all topics/partitions on a single thread.


Video Answer


2 Answers

In addition to the @EnableAsync annotation on a configuration class, the @Async annotation needs to be used on the method were you invoke this code.

http://www.baeldung.com/spring-async

Here some code fragements. Kafka producer config:

@EnableAsync
@Configuration
public class KafkaProducerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Value("${kafka.brokers}")
    private String servers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
    }

    @Bean
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
        return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
    }

    @Bean
    public Producer producer() {
        return new Producer();
    }
}

And the producer itself:

public class Producer {

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, GenericMessage> kafkaTemplate;

    @Async
    public void send(String topic, GenericMessage message) {
        ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {

            @Override
            public void onSuccess(final SendResult<String, GenericMessage> message) {
                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(final Throwable throwable) {
                LOGGER.error("unable to send message= " + message, throwable);
            }
        });
    }
}
like image 60
Jorge C Avatar answered Sep 26 '22 21:09

Jorge C


If I look at the KafkaProducer itself, there are two parts of sending a message:

  1. Storing the message into the internal buffer.
  2. Uploading the message from the buffer into Kafka.

KafkaProducer is asynchronous for the second part, not the first part.

The send() method can still be blocked on the first part and eventually throw TimeoutExceptions, e.g:

  • The metadata for the topics is not cached or stale, so the producer tries to get the metadata from the server to know if the topic still exists and how many partitions it has.
  • The buffer is full (32MB by default).

If the server is completely unresponsive, you will probably encounter both issues.

Update:

I tested and confirmed this in Kafka 2.2.1. It looks like this behaviour might be different in 2.4 and/or 2.6: KAFKA-3720

like image 35
GeertPt Avatar answered Sep 23 '22 21:09

GeertPt