I'm relatively new to reactive programming and Reactor. I have a situation in which I want to bufferTimeout
values in my stream while keeping it under my control (no unbounded request), so I can manually request batches of values.
The following sample illustrates it:
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
Flux<Object> flux = Flux.generate(sink -> {
try {
sink.next(queue.poll(10, TimeUnit.DAYS));
}
catch (InterruptedException e) {}
});
BaseSubscriber<List<Object>> subscriber = new BaseSubscriber<List<Object>>() {
protected void hookOnSubscribe(Subscription subscription) {
// Don't request unbounded
}
protected void hookOnNext(List<Object> value) {
System.out.println(value);
}
};
flux.subscribeOn(parallel())
.log()
.bufferTimeout(10, ofMillis(200))
.subscribe(subscriber);
subscriber.request(1);
// Offer a partial batch of values
queue.offer(1);
queue.offer(2);
queue.offer(3);
queue.offer(4);
queue.offer(5);
// Wait for timeout, expect [1, 2, 3, 4, 5] to be printed
Thread.sleep(500);
// Offer more values
queue.offer(6);
queue.offer(7);
queue.offer(8);
queue.offer(9);
queue.offer(10);
Thread.sleep(1000);
This is the output:
[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[ INFO] (main) request(10)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-1) onNext(2)
[ INFO] (parallel-1) onNext(3)
[ INFO] (parallel-1) onNext(4)
[ INFO] (parallel-1) onNext(5)
[1, 2, 3, 4, 5]
[ INFO] (parallel-1) onNext(6)
[ INFO] (parallel-1) onNext(7)
[ INFO] (parallel-1) onNext(8)
[ INFO] (parallel-1) onNext(9)
[ INFO] (parallel-1) onNext(10)
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
I actually expected it because I understand the buffer subscriber will request 10 values upstream, which is unaware of the timeout and will produce all of them regardless. As the one-and-only request was completed once the timeout finished, the values offered later are still produced and overflow.
I wonder if it's possible to prevent the remaining values from being produced after the timeout finished, or buffer them without losing control. I have tried:
limitRate(1)
before bufferTimeout
, trying to make the buffer request values "on demand". It does request one-by-one, but 10 times, because the buffer asked for 10 values.onBackpressureBuffer(10)
, as the problem is basically the definition to backpressure if I got it right. Trying to buffer the overflowing values from the timed-out request, but this requests unbounded values, which I'd like to avoid.Looks like I'll have to implement another bufferTimeout
implementation, but I'm told that writing publishers is hard. Am I missing something? Or am I doing reactive wrong?
Solved it by implementing my own subscriber:
https://gist.github.com/hossomi/5edf60acb534a16c025e12e4e803d014
It requests only the needed amount of values and buffers those received while there's no active request. The buffer is unbounded, so might want to use with caution or change it.
Most likely not as reliable as a standard Reactor subscriber, but works for me. Suggestions are welcome!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With