I have following Flux:
Flux.range(0, 100)
.log("before window")
.window(10)
.map(Flux::toList)
.log("after window")
.map((w) -> {
System.out.println(w.subscribe().get()));
return 1;
})
.reduce(0, (a, b) -> a + b)
.doOnSuccess(System.out::println)
.subscribe();
I thought in doOnNext
I should be able to get block of items for this moment. But .get
always fails with Timeout on Mono blocking read
excpeption.
I see from log, all had processed before .window
, and resulting log gives:
[ main] before window : onNext(0)
[ main] after window : onNext({ operator : "BufferAll" })
There're not so much information in official docs, so I guess I misunderstand something, and using window
function incorrectly. But what exactly I'm doing wrong here?
UPDATE
I figured out that this particular problem can be avoided if I use .doOnSuccess
instead. Like:
.map((w) -> {
w.doOnSuccess((w2) -> System.out.println(w2))).subscribe();
return 1;
}
but the real issue, that at my case I need to return a number from based on some calculations against provided data (instead of 1
). I can create a new Mono
here, but anyway, later I should .get
it. For example in final .reduce
. So if I do .reduce(0, (a, b) -> a + b.get())
it will fail there then.
How I can safely get value from a Mono?
UPDATE 2
Now I've remove Flux:toList and do it by myself, returning Mono
from mapping phase after window. That's probably how it should be.
.window(10)
.log("after window")
.map((w) -> {
//basically i'm reducing Flux to a Mono<List> and return number of a [good] elements in it
return w.reduce(...).map(ids -> 100).subscribe();
})
.reduce(0, (a, b) -> a + b.get())
But it doesn't work anyway, stuck in .reduce
:
Noticed that if I remove .reduce
step, it works. At this case processing of the Flux
provided by .window
are executed after main flow. I don't have any control over it, and even can't get final result. Which doesn't make any sense.
The issue was because I need to reduce window before further usage.
Like:
window(...).flatMap( (window) -> window.reduce(...))
I was doing that inside a Mono in my mapper, but it blocks execution flow there, so it isn't a right place. It must be after window, before next usage.
Correct version is:
Flux.range(0, 100)
.window(10)
.flatMap(window -> {
return window.reduce(new ArrayList<>(), (a, b) -> {
a.add(b);
return a;
});
})
.map((list) -> list.size())
.reduce(0, (a, b) -> a + b)
.doOnSuccess(System.out::println)
.subscribe();
I'm converting window to a List
, and then I can use this value in following operations.
I've been looking at this recently. The .window(3)
function turns it into a flux of fluxes. This can, I think, be processed neatly by doing a nested subscribe. The .buffer()
function converts the flux into a list when the window is onComplete()
.
This bit of code was the breakthrough for me...
Flux<Integer> flux = Flux.range(1, 10).log();
flux
.doOnNext(s -> logger.info("pre-window:{}", s))
.window(3)
.subscribe(s -> s.log().buffer().subscribe(t -> logger.info("post-window:{}", t)));
Also, simply using .buffer(3)
appears to produce similar if not identical results...
flux
.doOnNext(s -> logger.info("pre-buffer:{}", s))
.buffer(3)
.subscribe(t -> logger.info("post-buffer:{}", t));
Hope that helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With