I have created a ParallelFlux
and then used .sequential()
, expecting at that point I can count or "reduce" the results of the parallel computations. The problem seems to be that the parallel threads are fired off and nothing is waiting for them.
I have sort of gotten things to work using a CountDownLatch
but I don't think I should have to do that.
TL;DR - I cannot get a result to print out for this code:
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.subscribe(System.out::println);
When executing that code in a main, this is a good demonstration of the asynchronous nature of things. The operation runs on a thread from the elastic scheduler, and subscribe triggers the asynchronous processing so it returns immediately.
There are two ways of synchronizing the end of the application with the end of the Flux
:
print in doOnNext
and use blockLast()
The block* methods are typically used in main and tests, where there is no choice but to switch back to a blocking model (ie because otherwise the test/main would exit before the end of the processing).
We switch the "side effect" of printing each emitted item in a doOnNext
, which is dedicated to that sort of use cases. Then we block up until the completion of the flux, with blockLast()
.
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.doOnNext(System.out::println)
.blockLast();
print in subscribe
and use a CountDownLatch
in doFinally
This one is a bit more convoluted but allows you to actually use subscribe if you want to explore how that method works.
We just add a doFinally
, which triggers on any completion signal (cancellation, error or completion). We use a CountDownLatch
to block the main thread, which will be asynchronously counted down from within the doFinally
.
CountDownLatch latch = new CountDownLatch(1);
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.doFinally(signal -> latch.countDown())
.subscribe(System.out::println);
latch.await(10, TimeUnit.SECONDS);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With