Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Guaranteed delivery of multiple messages to Kafka cluster

If I publish several messages in a row to a Kafka cluster (using the new Producer API), I get a Future from the producer for each message.

Now, assuming I have configured my producer to have max.in.flight.requests.per.connection = 1 and retries > 0 can I just wait on the last future and be certain that all previous have also been delivered (and in order)? Or do I need to wait on all Futures? In code, can I do this:

Producer<String, String> producer = new KafkaProducer<>(myConfig);
Future<?> f = null;
for(MessageType message : messages){
  f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue());
}
try {
  f.get();
} catch(ExecutionException e) {
  //handle exception
}

instead of this:

Producer<String, String> producer = new KafkaProducer<>(myConfig);
List<Future<?>> futureList = new ArrayList<>();
for(MessageType message : messages){
  futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()));
}
try {
  for(Future<?> f : futureList) {
    f.get();
  }
} catch(ExecutionException e) {
  //handle exception
}

and be assured that if nothing is caught here (from first snippet):

try {
  f.get();
} catch(ExecutionException e) {

then all my messages have been stored in the cluster in order (whether or not the producer performed any retries under the hood) and if something goes wrong then I WILL get an exception there even if it was not the last future (that I'm waiting on) that first encountered the problem?

Are there any more strange corner cases to be aware of?

like image 485
Manjabes Avatar asked Mar 29 '16 09:03

Manjabes


2 Answers

You can do this, but only if you a) set retries to be infinite (or effectively infinite) and b) are ok discarding data if you encounter a non-retriable exception.

To explain a bit more, Kafka has two classes of exceptions. Retriable exceptions are failures where you might be able to succeed if you run it again. For example, the NotEnoughReplicasException indicates that there are fewer replicas than you require and so the request gets rejected. But if a failed broker comes back online, then you might have enough replicas, be back in good shape, and the request will succeed if you send it again. In contrast, a SerializationException is not retriable because we have no reason to believe that if you try to serialize again the result will be different.

The producer retries only apply up to the point you hit a non-retriable exception. So if you never hit any of these, use infinite retries, and use the other settings you mentioned, the ordering and successful delivery are guaranteed once the final future has been resolved. However, since you might encounter non-retriable exceptions, it is definitely much better to handle each future (or callback) and ensure you at least log something if a request fails.

like image 65
Ewen Cheslack-Postava Avatar answered Nov 15 '22 06:11

Ewen Cheslack-Postava


Further to what Ewen said, you could also make a call to flush() after you finished sending all your messages in the loop. This call will block until all futures have been completed, so after this you can check the futures for any exceptions. You'd need to hold on to all futures to be able to do this though.

An alternative way would be to use a callback with your sends and store any returned exceptions, like shown below. The use of flush again ensures that all sends have been completed, before you check for exceptions.

Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();

for(MessageType message : messages){
  producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception != null) {
        exceptionList.add(exception);
      }
    }
  });
}

producer.flush();

if (!exceptionList.isEmpty()) {
  // do stuff
}
like image 25
Sönke Liebau Avatar answered Nov 15 '22 07:11

Sönke Liebau