Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Project Reactor - how to process results by window

I have following Flux:

Flux.range(0, 100)
    .log("before window")
    .window(10)
    .map(Flux::toList)
    .log("after window")
    .map((w) -> {
        System.out.println(w.subscribe().get()));
        return 1;
    }) 
    .reduce(0, (a, b) -> a + b)
    .doOnSuccess(System.out::println)
    .subscribe();

I thought in doOnNext I should be able to get block of items for this moment. But .get always fails with Timeout on Mono blocking read excpeption.

I see from log, all had processed before .window, and resulting log gives:

[           main] before window : onNext(0)
[           main] after window  : onNext({ operator : "BufferAll" })

There're not so much information in official docs, so I guess I misunderstand something, and using window function incorrectly. But what exactly I'm doing wrong here?

UPDATE

I figured out that this particular problem can be avoided if I use .doOnSuccess instead. Like:

.map((w) -> {
    w.doOnSuccess((w2) -> System.out.println(w2))).subscribe();
    return 1;
}

but the real issue, that at my case I need to return a number from based on some calculations against provided data (instead of 1). I can create a new Mono here, but anyway, later I should .get it. For example in final .reduce. So if I do .reduce(0, (a, b) -> a + b.get()) it will fail there then.

How I can safely get value from a Mono?

UPDATE 2

Now I've remove Flux:toList and do it by myself, returning Mono from mapping phase after window. That's probably how it should be.

.window(10)
.log("after window")
.map((w) -> {
    //basically i'm reducing Flux to a Mono<List> and return number of a [good] elements in it
    return w.reduce(...).map(ids -> 100).subscribe();
})
.reduce(0, (a, b) -> a + b.get()) 

But it doesn't work anyway, stuck in .reduce:

Noticed that if I remove .reduce step, it works. At this case processing of the Flux provided by .window are executed after main flow. I don't have any control over it, and even can't get final result. Which doesn't make any sense.

like image 883
Igor Artamonov Avatar asked Oct 19 '22 08:10

Igor Artamonov


2 Answers

The issue was because I need to reduce window before further usage.

Like:

window(...).flatMap( (window) -> window.reduce(...))

I was doing that inside a Mono in my mapper, but it blocks execution flow there, so it isn't a right place. It must be after window, before next usage.

Correct version is:

Flux.range(0, 100)
    .window(10)
    .flatMap(window -> {
         return window.reduce(new ArrayList<>(), (a, b) -> {
             a.add(b);
             return a;
         });
    })
    .map((list) -> list.size())
    .reduce(0, (a, b) -> a + b)
    .doOnSuccess(System.out::println)
    .subscribe();

I'm converting window to a List, and then I can use this value in following operations.

like image 169
Igor Artamonov Avatar answered Oct 27 '22 22:10

Igor Artamonov


I've been looking at this recently. The .window(3) function turns it into a flux of fluxes. This can, I think, be processed neatly by doing a nested subscribe. The .buffer() function converts the flux into a list when the window is onComplete().

This bit of code was the breakthrough for me...

Flux<Integer> flux = Flux.range(1, 10).log();

flux
  .doOnNext(s -> logger.info("pre-window:{}", s))
  .window(3)
  .subscribe(s -> s.log().buffer().subscribe(t -> logger.info("post-window:{}", t)));

Also, simply using .buffer(3) appears to produce similar if not identical results...

flux
    .doOnNext(s -> logger.info("pre-buffer:{}", s))
    .buffer(3)
    .subscribe(t -> logger.info("post-buffer:{}", t));

Hope that helps!

like image 29
James Burton Avatar answered Oct 27 '22 21:10

James Burton