I have a situation where I need multiple workers (let's say 2). Workers must perform a task consuming upstream events.
The task at hand consumes a list of events and has a constant time independent from the size of the list.
Therefore I wish upstream to deliver a list with all the buffered events only when requested to, 1 list at a time.
Sadly most methods implement prefetching.
What happens is that even using
limitRate(1, 0)
upstream receives one too many onRequest(1)
, just to replenish the downstream buffer.
So I struggle to have the buffered list produced just when a worker is available: they are usually produced in advance missing my goal of maximizing the size of the buffered list.
How can I implement such a setup?
Is there a way to disable prefetching altogether?
Not sure I've understood the question correctly. Sample code that shows what you are doing currently would help.
One way to not pull data from the source until an onRequest
is to defer
the instantiation of the Flux. So your code would look something like:
Flux source = Flux.defer(() -> getFluxForUpstreamSource());
Another way to consume from a source using backpressure is used Flux.generate
. Your code would look something like:
Flux source = Flux.generate(
UpstreamSource::getConnection,
(connection, sink) -> {
try {
sink.next(connection.getNext());
} catch (UpstreamException e) {
sink.error(e);
}
return connection;
}
);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With