Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Kafka Streams: Proper way to exit on error

I've been successful in getting a streams app to consume, transform and produce data, but I've noticed that periodically, the streams processor will transition to a state of ERROR and the process will sit there without exiting.

Showing me logs like:

All stream threads have died. The instance will be in error state and should be closed.

Is there a way to tell the Streams app to exit once its reached the ERROR state? Maybe a monitor thread of sorts?

I see references in the comments of the Kafka Streams code to the user needing to close the application once its reached this state, however, I haven't been able to find mention of this task in the documentation.

Is there a simple way to do this shutdown step?

probably incorrect way to potentially close on errors

My intention was to set the UncaughtExceptionHandler method on the KafkaStreams object, to do the following:

  • log the error
  • shutdown the stream using the close method on the original KafkaStreams object

What results from that is:

  • message for the exception is logged
  • INFO org.apache.kafka.streams.KafkaStreams ... State transition from ERROR to PENDING_SHUTDOWN
  • INFO org.apache.kafka.streams.processor.internals.StreamThread ... Informed to shut down

And then, unfortunately the process seems to hang without exiting.

FWIW I feel like this is probably a misuse of the setUncaughtExceptionHandler

like image 535
rwaweber Avatar asked Mar 06 '18 20:03


People also ask

How do I close a Kafka stream?

To stop the application instance, call the KafkaStreams#close() method: // Stop the Kafka Streams threads streams. close(); To allow your application to gracefully shutdown in response to SIGTERM, it is recommended that you add a shutdown hook and call KafkaStreams#close .

What is grace period in Kafka streams?

« Kafka Summit Americas 2021. The grace period is a parameter of windowed operations such as Window or Session aggregates, or stream-stream joins. This configuration determines how long after a window ends any new data will still be processed.

How does Kafka retry work?

To address the problem of blocked batches, we set up a distinct retry queue using a separately defined Kafka topic. Under this paradigm, when a consumer handler returns a failed response for a given message after a certain number of retries, the consumer publishes that message to its corresponding retry topic.

1 Answers

Using an UncaughtExceptionHandler is correct. However, if you call KafkaStreams#close() within the handler call-back you can run into a deadlock. Thus, you should either only set a flag, and call #close() outside of the callback, or you use close() with a timeout. If the timeout expires, a shutdown is forced.

like image 82
Matthias J. Sax Avatar answered Oct 20 '22 16:10

Matthias J. Sax