Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Waiting for ParallelFlux completion

I have created a ParallelFlux and then used .sequential(), expecting at that point I can count or "reduce" the results of the parallel computations. The problem seems to be that the parallel threads are fired off and nothing is waiting for them.

I have sort of gotten things to work using a CountDownLatch but I don't think I should have to do that.

TL;DR - I cannot get a result to print out for this code:

    Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .subscribe(System.out::println);
like image 856
K.Nicholas Avatar asked Mar 23 '20 21:03

K.Nicholas


1 Answers

When executing that code in a main, this is a good demonstration of the asynchronous nature of things. The operation runs on a thread from the elastic scheduler, and subscribe triggers the asynchronous processing so it returns immediately.

There are two ways of synchronizing the end of the application with the end of the Flux:

print in doOnNext and use blockLast()

The block* methods are typically used in main and tests, where there is no choice but to switch back to a blocking model (ie because otherwise the test/main would exit before the end of the processing).

We switch the "side effect" of printing each emitted item in a doOnNext, which is dedicated to that sort of use cases. Then we block up until the completion of the flux, with blockLast().

Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .doOnNext(System.out::println)
    .blockLast();

print in subscribe and use a CountDownLatch in doFinally

This one is a bit more convoluted but allows you to actually use subscribe if you want to explore how that method works.

We just add a doFinally, which triggers on any completion signal (cancellation, error or completion). We use a CountDownLatch to block the main thread, which will be asynchronously counted down from within the doFinally.

CountDownLatch latch = new CountDownLatch(1);

Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .doFinally(signal -> latch.countDown())
    .subscribe(System.out::println);

latch.await(10, TimeUnit.SECONDS);
like image 186
Simon Baslé Avatar answered Sep 17 '22 22:09

Simon Baslé