I am building a spring boot application using Spring Webflux and I want to make the application fully non-blocking. The application itself has some REST endpoints and a batch job that needs to run every few seconds. For the batch job, I am trying to Flux.interval(Duration.ofMillis(1000))
to generate long values which I ignore and run my scheduled job.
Flux.interval(Duration.ofMillis(1000))
.flatMap(ignore -> doSomething())
.subscribe();
However after some time I get the error
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 257 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
Can someone tell me how to overcome this issue ?
How to extract data from Flux in Java? Another way would be using the Reactive Streams operators like onNext, flatMap, etc. In the below example, we are using the onNext() method to get data from Flux and print it. Note: We need to subscribe to the Publisher.
Flux. interval(Duration) produces a Flux that is infinite and emits regular ticks from a clock. So your first example has no concurrency, everything happens on the same thread. The subscription and all events must complete before the method and process ends. By using Flux.
Backpressure is the ability of a Consumer to signal the Producer that the rate of emission is higher than what it can handle. So using this mechanism, the Consumer gets control over the speed at which data is emitted. If you are new to Project Reactor, read about the Flux in reactive stream.
Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.
The cause of the issue is most likely that the doSomething()
operation takes longer than the specified Flux interval, which means that after some time doSomething
jobs overlap each other and backpressure kicks in. Since Flux.interval
is a hot source (meaning it doesn't emit signals on demand) and flatMap
has a default limit of concurrency (256), the operator gets overwhelmed and this results in an OverflowException
.
Based on your requirements, there a couple of potential solutions for this problem:
This means sometimes, we skip a second and don't schedule a job for the interval if we already have a lot (256) in progress.
Flux.interval(Duration.ofMillis(1000))
.onBackpressureDrop()
.flatMap(ignore -> doSomething())
flatMap
concurrency to a higher valueThis can still result in OverflowException after some time, but it delays the problem (probably not the best solution).
Flux.interval(Duration.ofMillis(1000))
.flatMap(ignore -> doSomething(), Integer.MAX_VALUE)
We switch from a hot source to a cold source which eliminates the possibility of overflow. However, we lose the guarantee of scheduling an event every second. Instead, they will be scheduled on demand when previous job finished and at least 1 second elapsed.
Mono.just(1).repeat() // infinite Flux with backpressure
.delayElements(Duration.ofMillis(1000))
.concatMap(ignore -> doSomething())
You can also combine this solution with the previous one if you are fine with overlapping jobs and define a reasonable concurrency level in the flatMap
call.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With