I have a stream app with below driver code for real-time message transformation.
String topicName = ...
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(topicName);
source.transform(() -> new MyTransformer()).to(...);
KafkaStreams streams = new KafkaStreams(builder, appConfig);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
logger.error("UncaughtExceptionHandler " + e.getMessage());
System.exit(0);
}
});
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
After few minutes of execution, app throws the below exception and then not progressing through the stream.
[2017-02-22 14:24:35,139] ERROR [StreamThread-14] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group TRANSFORMATION-APP failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: task [0_11] Error while creating the state manager
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89)
at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.io.IOException: task [0_11] Failed to lock the state directory: /tmp/kafka-streams/TRANSFORMATION-APP/0_11
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101)
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
... 13 more
I tried to flush out the /tmp/kafka-streams/TRANSFORMATION-APP
directory and restarted the app but again throws the same exception. One thing I noticed was that app works fine until it transforms all backlog messages but throws exception after processing some of the new messages!
Sometimes it also throws the below uncaught exceptions.
[ERROR] 2017-02-22 12:40:54.804 [StreamThread-29] MyTransformer - UncaughtExceptionHandler task directory [/tmp/kafka-streams/TRANSFORMATION-APP/0_24] doesn't exist and couldn't be created
[ERROR] 2017-02-22 12:42:30.148 [StreamThread-179] MyTransformer - UncaughtExceptionHandler stream-thread [StreamThread-179] Failed
to rebalance
After throwing (one of) these exceptions, app is still running but not progressing through the stream.
What is the correct way to handle these errors?. Is it possible to restart the stream programmatically, without killing the app? This app is under monit. At at worst case, I would prefer to terminate the app properly (without any message loss), so that monit can restart it.
The input topic has 100 partitions and I have set num.stream.threads
to 100 in the app configuration. The app is on Kafka 0.10.1.1-cp1.
Kakfa 0.10.1.x
has some bugs with regard to multi threading. You can either upgrade to 0.10.2
(AK released today, CP 3.2 should follow shortly) or you apply the following workaround:
You might also need to delete your local state directory (only once) before restarting to get into a overall consistent application state.
In any case, there will be no data loss. Kafka Streams guarantees at-least-once processing semantics even in case of failure. This applies to you local stores too -- after you delete local state dir, on startup those state will be recreated from the underlying Kafka changelog topics (it an expensive operation though).
The UncaughtExceptionHandler
does only provide you a way to figure out that a thread died. It does not (directly) help to restart your application. To recover died threads, you need to close KafkaStreams
instance completely and create/start a new one. We hope to add better support for this in the future.
I understand that this question was asked far ago, but will post update about new Kafka-Streams feature. Since Kafka-Streams
2.8.0
, you have the ability to automatically replace failed stream thread (that caused by uncaught exception)
using KafkaStreams
method void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);
with StreamThreadExceptionResponse.REPLACE_THREAD
. With that, failed message will be reprocessed on new replaced stream.
For more details please take a look at Kafka Streams Specific Uncaught Exception Handler
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
Prior to Kafka-Streams
2.8.0
, you can implement logic of restarting failed KafkaStreams by your own. The idea is like the following:
KafkaStreams kafkaStreams = createYourKafkaStreams();
kafkaStreams.setStateListener(createErrorStateListener(sourceTopicName, kafkaStreams));
private KafkaStreams.StateListener createErrorStateListener(String sourceTopicName, KafkaStreams kafkaStreams) {
return (newState, oldState) -> {
if (newState == KafkaStreams.State.ERROR) {
log.error("Kafka Stream is in ERROR state for source topic [{}]", sourceTopicName);
replaceFailedKafkaStream(kafkaStreams, sourceTopicName);
}
};
}
// invoke this method either right after stream died, or by scheduling
private void replaceFailedKafkaStream(KafkaStreams kafkaStreams, String sourceTopicName) {
kafkaStreams.close();
KafkaStreams newKafkaStreams = createYourKafkaStreams();
newKafkaStreams.setStateListener(createErrorStateListener(sourceTopicName, newKafkaStreams));
newKafkaStreams.start();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With