Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Project Reactor - How to handle OverflowException from Flux.interval?

I am building a spring boot application using Spring Webflux and I want to make the application fully non-blocking. The application itself has some REST endpoints and a batch job that needs to run every few seconds. For the batch job, I am trying to Flux.interval(Duration.ofMillis(1000)) to generate long values which I ignore and run my scheduled job.

Flux.interval(Duration.ofMillis(1000))
    .flatMap(ignore -> doSomething())
    .subscribe();

However after some time I get the error

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 257 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)

Can someone tell me how to overcome this issue ?

like image 804
DBS Avatar asked Feb 05 '20 13:02

DBS


People also ask

How do you get data from flux?

How to extract data from Flux in Java? Another way would be using the Reactive Streams operators like onNext, flatMap, etc. In the below example, we are using the onNext() method to get data from Flux and print it. Note: We need to subscribe to the Publisher.

What is flux interval?

Flux. interval(Duration) produces a Flux that is infinite and emits regular ticks from a clock. So your first example has no concurrency, everything happens on the same thread. The subscription and all events must complete before the method and process ends. By using Flux.

What is backpressure in reactor?

Backpressure is the ability of a Consumer to signal the Producer that the rate of emission is higher than what it can handle. So using this mechanism, the Consumer gets control over the speed at which data is emitted. If you are new to Project Reactor, read about the Flux in reactive stream.

What is flux in reactor?

Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.


Video Answer


1 Answers

The cause of the issue is most likely that the doSomething() operation takes longer than the specified Flux interval, which means that after some time doSomething jobs overlap each other and backpressure kicks in. Since Flux.interval is a hot source (meaning it doesn't emit signals on demand) and flatMap has a default limit of concurrency (256), the operator gets overwhelmed and this results in an OverflowException.

Based on your requirements, there a couple of potential solutions for this problem:

1. Ignore the overflow error and drop the signals which would overflow

This means sometimes, we skip a second and don't schedule a job for the interval if we already have a lot (256) in progress.

Flux.interval(Duration.ofMillis(1000))
    .onBackpressureDrop()
    .flatMap(ignore -> doSomething())

2. Set flatMap concurrency to a higher value

This can still result in OverflowException after some time, but it delays the problem (probably not the best solution).

Flux.interval(Duration.ofMillis(1000))
    .flatMap(ignore -> doSomething(), Integer.MAX_VALUE)

3. Don't let jobs overlap each other

We switch from a hot source to a cold source which eliminates the possibility of overflow. However, we lose the guarantee of scheduling an event every second. Instead, they will be scheduled on demand when previous job finished and at least 1 second elapsed.

Mono.just(1).repeat() // infinite Flux with backpressure
    .delayElements(Duration.ofMillis(1000))
    .concatMap(ignore -> doSomething())

You can also combine this solution with the previous one if you are fine with overlapping jobs and define a reasonable concurrency level in the flatMap call.

like image 86
Martin Tarjányi Avatar answered Oct 19 '22 05:10

Martin Tarjányi