I've been successful in getting a streams app to consume, transform and produce data, but I've noticed that periodically, the streams processor will transition to a state of ERROR
and the process will sit there without exiting.
Showing me logs like:
All stream threads have died. The instance will be in error state and should be closed.
Is there a way to tell the Streams app to exit once its reached the ERROR
state? Maybe a monitor thread of sorts?
I see references in the comments of the Kafka Streams code to the user needing to close the application once its reached this state, however, I haven't been able to find mention of this task in the documentation.
Is there a simple way to do this shutdown step?
My intention was to set the UncaughtExceptionHandler
method on the KafkaStreams
object, to do the following:
close
method on the original KafkaStreams
objectWhat results from that is:
INFO org.apache.kafka.streams.KafkaStreams ... State transition from ERROR to PENDING_SHUTDOWN
INFO org.apache.kafka.streams.processor.internals.StreamThread ... Informed to shut down
And then, unfortunately the process seems to hang without exiting.
FWIW I feel like this is probably a misuse of the setUncaughtExceptionHandler
To stop the application instance, call the KafkaStreams#close() method: // Stop the Kafka Streams threads streams. close(); To allow your application to gracefully shutdown in response to SIGTERM, it is recommended that you add a shutdown hook and call KafkaStreams#close .
« Kafka Summit Americas 2021. The grace period is a parameter of windowed operations such as Window or Session aggregates, or stream-stream joins. This configuration determines how long after a window ends any new data will still be processed.
To address the problem of blocked batches, we set up a distinct retry queue using a separately defined Kafka topic. Under this paradigm, when a consumer handler returns a failed response for a given message after a certain number of retries, the consumer publishes that message to its corresponding retry topic.
Using an UncaughtExceptionHandler
is correct. However, if you call KafkaStreams#close()
within the handler call-back you can run into a deadlock. Thus, you should either only set a flag, and call #close()
outside of the callback, or you use close()
with a timeout. If the timeout expires, a shutdown is forced.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With