Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling exceptions in Kafka streams

Had gone through multiple posts but most of them are related handling Bad messages not about exception handling while processing them.

I want to know to how to handle the messages that is been received by the stream application and there is an exception while processing the message? The exception could be because of multiple reasons like Network failure, RuntimeException etc.,

  • Could someone suggest what is the right way to do? Should I use setUncaughtExceptionHandler? or is there a better way?
  • How to handle retries?
like image 980
Thiru Avatar asked Jul 12 '18 07:07

Thiru


People also ask

How do you handle failed messages in Kafka?

The message is processed if no error occurs. Send the message to a dedicated DLQ Kafka topic if any exception occurs. The failure cause should be added to the header of the Kafka message. The key and value should not be changed so that future re-processing and failure analysis of historical events are straightforward.

How does Kafka guarantee at-least-once?

At-Least-Once Delivery in Apache Kafka At-least-once delivery requires the producer to maintain an extra state about message status and to resend failed messages. This means that at-least-once delivery sacrifices some performance in exchange for the guarantee that all messages will be delivered.

How do I stop a Kafka stream?

To stop the application instance, call the KafkaStreams#close() method: // Stop the Kafka Streams threads streams. close(); To allow your application to gracefully shutdown in response to SIGTERM, it is recommended that you add a shutdown hook and call KafkaStreams#close .

Is the primary abstraction in Kafka streams?

A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.


3 Answers

it depends what do you want to do with exceptions on producer side. if exception will be thrown on producer (e.g. due to Network failure or kafka broker has died), stream will die by default. and with kafka-streams version 1.1.0 you could override default behavior by implementing ProductionExceptionHandler like the following:

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {

    @Override
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]",  new String(record.value()), record.topic(), exception);
        return ProductionExceptionHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(final Map<String, ?> configs) {
    }

}

from handle method you could return either CONTINUE if you don't want streams dying on exception, on return FAIL in case you want stream stops (FAIL is default one). and you need specify this class in stream config:

default.production.exception.handler=com.example.CustomProductionExceptionHandler

Also pay attention that ProductionExceptionHandler handles only exceptions on producer, and it will not handle exceptions during processing message with stream methods mapValues(..), filter(..), branch(..) etc, you need to wrap these method logic with try / catch blocks (put all your method logic into try block to guarantee that you will handle all exceptional cases):

.filter((key, value) -> { try {..} catch (Exception e) {..} })

as I know, we don't need to handle exceptions on consumer side explicitly, as kafka streams will retry automatically consuming later (as offset will not be changed until messages will be consumed and processed); e.g. if kafka broker will be not reachable for some time, you will got exceptions from kafka streams, and when broken will be up, kafka stream will consume all messages. so in this case we will have just delay and nothing corrupted/lost.

with setUncaughtExceptionHandler you will not be able to change default behavior like with ProductionExceptionHandler, with it you could only log error or send message into failure topic.


Update since kafka-streams 2.8.0

since kafka-streams 2.8.0, you have the ability to automatically replace failed stream thread (that caused by uncaught exception) using KafkaStreams method void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh); with StreamThreadExceptionResponse.REPLACE_THREAD. For more details please take a look at Kafka Streams Specific Uncaught Exception Handler

kafkaStreams.setUncaughtExceptionHandler(ex -> {
    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
like image 176
Vasyl Sarzhynskyi Avatar answered Oct 13 '22 02:10

Vasyl Sarzhynskyi


For handling exceptions on the consumer side,

1) You can add a default exception handler in producer with the following property.

"default.deserialization.exception.handler" = "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler";

Basically apache provides three exception handler classes as

1) LogAndContiuneExceptionHandler which you can take as

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndContinueExceptionHandler.class);

2) LogAndFailExceptionHandler

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndFailExceptionHandler.class);

3) LogAndSkipOnInvalidTimestamp

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndSkipOnInvalidTimestamp.class);

For custom exception handling,

1)you can implement the DeserializationExceptionHandler interface and override the handle() method.

2) Or you can extend the above-mentioned classes.

like image 33
Vishal Pawar Avatar answered Oct 13 '22 02:10

Vishal Pawar


setUncaughtExceptionHandler doesn't help to handle exception, it works after the stream has terminated due to some exception which was not caught.

Kafka provides few ways to handle exceptions. A simple try-catch{} would help catch exceptions in the processor code but kafka deserialization exception (can be due to data issues) and production exception(occurs during communication with broker) requires DeserializationExceptionHandler and ProductionExceptionHandler respectively. By default a kafka application would fail if it encounter any of these.

You can find on this post

like image 2
tsukyonomi06 Avatar answered Oct 13 '22 01:10

tsukyonomi06