I have a Spring Cloud Stream project using the Kafka binder and I'm trying to understand and eventually customize the RetryTemplate used by Cloud Stream.
I'm not finding a lot of documentation on how this works, but what I've read leads me to the following assumptions:
@StreamListener
will trigger Spring RetryAre these assumptions correct?
Now, in my application, I have a pattern where some messages can be handled immediately, but others must be deferred to be tried again later (using exponential backoff etc).
Should I be throwing an exception causing Spring Cloud Stream to retry these messages at the binder layer, or implementing retry myself and tracking my own retry contexts?
If I should be relying on Cloud Stream's retry setup, how should I customize the backoff policies, etc?
Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.
Every topic in Kafka is split into one or more partitions. Kafka partitions data for storing, transporting, and replicating it. Kafka Streams partitions data for processing it. In both cases, this partitioning enables elasticity, scalability, high performance, and fault tolerance.
Spring Cloud Stream is a framework for building highly scalable, event-driven microservices connected with shared messaging systems. Spring Cloud Stream provides components that abstract the communication with many message brokers away from the code.
SCS supports the reactive programming model. It uses Reactor library which allows us to write asynchronous, non blocking, declarative code. Sometimes declarative code can be a drawback - especially when the pipeline contains many instructions - because the business logics and framework instructions are mixed.
The default retry configuration is 3 attempts, 1 second initial delay, 2.0 multiplier, max delay 10 seconds.
By default stateless retry is used, meaning that the retries are in memory.
The aggregate delay for all retries for all records returned by a poll()
must not exceed max.poll.interval.ms
.
With modern versions of Spring for Apache Kafka (used by the binder); it is better to disable binder retries (maxAttempts=1
) and use a SeekToCurrentErrorHandler
with an appropriate BackOff
configured.
You can set the error handler with a ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>>
@Bean
with return (container, dest, grp) -> container.setErrorHandler(handler)
.
This avoids the problem mentioned above and only the max delay interval for one record must be less than max.poll.interval.ms
.
You can also classify which exceptions are retryable and which are not, as well as configuring a dead-letter recoverer which is invoked after retries are exhausted.
See the reference documentation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With