I am using ConcurrentMessageListenerContainer with auto commit set to false to consume messages from the topic and write to Database. If Database is down I need to stop the container from processing current record in a poll and don't do next poll(). I have implemented DataSourceHealthIndicator and after checking Database status to UP I want to restart my container again to process remaining records.
Any suggestion how can I stop processing remaining records and stop the container, I have tried to use consumer.close(). But it doesn't stop the process and kept throwing consumer is already closed.
Call container.stop(). If you are using @KafkaListeners, call stop() on the listener container registry which will stop all registered containers.
EDIT
Error handlers for automatically stopping the container have been available since the 2.1 release; at the time of writing the current version is 2.2.3.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With