Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set up an entirely backpressure driven flux in Java Reactor?

I have a situation where I need multiple workers (let's say 2). Workers must perform a task consuming upstream events.

The task at hand consumes a list of events and has a constant time independent from the size of the list.

Therefore I wish upstream to deliver a list with all the buffered events only when requested to, 1 list at a time.

Sadly most methods implement prefetching. What happens is that even using limitRate(1, 0) upstream receives one too many onRequest(1), just to replenish the downstream buffer.

So I struggle to have the buffered list produced just when a worker is available: they are usually produced in advance missing my goal of maximizing the size of the buffered list.

How can I implement such a setup?

Is there a way to disable prefetching altogether?

like image 981
Federico Bonelli Avatar asked Nov 07 '22 22:11

Federico Bonelli


1 Answers

Not sure I've understood the question correctly. Sample code that shows what you are doing currently would help.

One way to not pull data from the source until an onRequest is to defer the instantiation of the Flux. So your code would look something like:

Flux source = Flux.defer(() -> getFluxForUpstreamSource());

Another way to consume from a source using backpressure is used Flux.generate. Your code would look something like:

Flux source = Flux.generate(
        UpstreamSource::getConnection,
        (connection, sink) -> {
            try {
                sink.next(connection.getNext());
            } catch (UpstreamException e) {
                sink.error(e);
            }
            return connection;
        }
);
like image 154
Rajesh J Advani Avatar answered Nov 14 '22 15:11

Rajesh J Advani