Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Correct way to restart or shutdown the stream using UncaughtExceptionHandler

I have a stream app with below driver code for real-time message transformation.

String topicName = ...
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(topicName);

source.transform(() -> new MyTransformer()).to(...);

KafkaStreams streams = new KafkaStreams(builder, appConfig);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    public void uncaughtException(Thread t, Throwable e) {
        logger.error("UncaughtExceptionHandler " + e.getMessage());
        System.exit(0);
    }
});


streams.cleanUp();
streams.start();

Runtime.getRuntime().addShutdownHook(new  Thread(streams::close));

After few minutes of execution, app throws the below exception and then not progressing through the stream.

[2017-02-22 14:24:35,139] ERROR [StreamThread-14] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group TRANSFORMATION-APP failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: task [0_11] Error while creating the state manager
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.io.IOException: task [0_11] Failed to lock the state directory: /tmp/kafka-streams/TRANSFORMATION-APP/0_11
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
    ... 13 more

I tried to flush out the /tmp/kafka-streams/TRANSFORMATION-APP directory and restarted the app but again throws the same exception. One thing I noticed was that app works fine until it transforms all backlog messages but throws exception after processing some of the new messages!

Sometimes it also throws the below uncaught exceptions.

[ERROR] 2017-02-22 12:40:54.804 [StreamThread-29] MyTransformer - UncaughtExceptionHandler task directory [/tmp/kafka-streams/TRANSFORMATION-APP/0_24] doesn't exist and couldn't be created

[ERROR] 2017-02-22 12:42:30.148 [StreamThread-179] MyTransformer - UncaughtExceptionHandler stream-thread [StreamThread-179] Failed 
to rebalance

After throwing (one of) these exceptions, app is still running but not progressing through the stream.

What is the correct way to handle these errors?. Is it possible to restart the stream programmatically, without killing the app? This app is under monit. At at worst case, I would prefer to terminate the app properly (without any message loss), so that monit can restart it.

The input topic has 100 partitions and I have set num.stream.threads to 100 in the app configuration. The app is on Kafka 0.10.1.1-cp1.

like image 798
Samy Avatar asked Feb 22 '17 17:02

Samy


2 Answers

Kakfa 0.10.1.x has some bugs with regard to multi threading. You can either upgrade to 0.10.2 (AK released today, CP 3.2 should follow shortly) or you apply the following workaround:

  • use single thread execution only
  • if you need more thread, start more instances
  • for each instance, configure a different state directory

You might also need to delete your local state directory (only once) before restarting to get into a overall consistent application state.

In any case, there will be no data loss. Kafka Streams guarantees at-least-once processing semantics even in case of failure. This applies to you local stores too -- after you delete local state dir, on startup those state will be recreated from the underlying Kafka changelog topics (it an expensive operation though).

The UncaughtExceptionHandler does only provide you a way to figure out that a thread died. It does not (directly) help to restart your application. To recover died threads, you need to close KafkaStreams instance completely and create/start a new one. We hope to add better support for this in the future.

like image 195
Matthias J. Sax Avatar answered Nov 03 '22 01:11

Matthias J. Sax


I understand that this question was asked far ago, but will post update about new Kafka-Streams feature. Since Kafka-Streams 2.8.0, you have the ability to automatically replace failed stream thread (that caused by uncaught exception) using KafkaStreams method void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh); with StreamThreadExceptionResponse.REPLACE_THREAD. With that, failed message will be reprocessed on new replaced stream. For more details please take a look at Kafka Streams Specific Uncaught Exception Handler

kafkaStreams.setUncaughtExceptionHandler(ex -> {
    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});

Prior to Kafka-Streams 2.8.0, you can implement logic of restarting failed KafkaStreams by your own. The idea is like the following:

KafkaStreams kafkaStreams = createYourKafkaStreams();
kafkaStreams.setStateListener(createErrorStateListener(sourceTopicName, kafkaStreams));

private KafkaStreams.StateListener createErrorStateListener(String sourceTopicName, KafkaStreams kafkaStreams) {
    return (newState, oldState) -> {
        if (newState == KafkaStreams.State.ERROR) {
            log.error("Kafka Stream is in ERROR state for source topic [{}]", sourceTopicName);
            replaceFailedKafkaStream(kafkaStreams, sourceTopicName);
        }
    };
}

// invoke this method either right after stream died, or by scheduling
private void replaceFailedKafkaStream(KafkaStreams kafkaStreams, String sourceTopicName) {
    kafkaStreams.close();
    KafkaStreams newKafkaStreams = createYourKafkaStreams();
    newKafkaStreams.setStateListener(createErrorStateListener(sourceTopicName, newKafkaStreams));
    newKafkaStreams.start();
}
like image 22
Vasyl Sarzhynskyi Avatar answered Nov 03 '22 02:11

Vasyl Sarzhynskyi