Project Reactor 3.1.5.RELEASE
Consider this:
Flux.range(0, 10)
.publishOn(Schedulers.parallel())
.subscribe(i -> LOG.info(i));
I am expecting the subscriber to run in multiple threads, but it runs only in one:
2018-03-26 12:30:08.693 INFO 89770 --- [ parallel-1] d.a.Application : 0
2018-03-26 12:30:08.693 INFO 89770 --- [ parallel-1] d.a.Application : 1
2018-03-26 12:30:08.693 INFO 89770 --- [ parallel-1] d.a.Application : 2
2018-03-26 12:30:08.693 INFO 89770 --- [ parallel-1] d.a.Application : 3
2018-03-26 12:30:08.694 INFO 89770 --- [ parallel-1] d.a.Application : 4
2018-03-26 12:30:08.694 INFO 89770 --- [ parallel-1] d.a.Application : 5
2018-03-26 12:30:08.694 INFO 89770 --- [ parallel-1] d.a.Application : 6
2018-03-26 12:30:08.694 INFO 89770 --- [ parallel-1] d.a.Application : 7
2018-03-26 12:30:08.694 INFO 89770 --- [ parallel-1] d.a.Application : 8
2018-03-26 12:30:08.694 INFO 89770 --- [ parallel-1] d.a.Application : 9
The documentation tells my expectations are correct (http://projectreactor.io/docs/core/release/reference/#threading). Could someone explain to me what's going on there?
ParallelFlux is created from an existing Flux , using the parallel operator. By default, this splits the stream into the total number of CPU cores that are available. ParallelFlux only divides the stream, and does not change the execution model. Instead, it executes the streams on the default thread—the main thread.
Mono is more relatable to the Optional class in Java since it contains 0 or 1 value, and Flux is more relatable to List since it can have N number of values.
To perform parallel execution using ParallelFlux , first you need to convert a Flux<T> into a ParallelFlux<T> . After that, you have to call runOn(Scheduler) to specify in which scheduler the elements should be processed. Without that, it cannot be processed in parallel.
A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.
Reactive flows are sequential in nature and publishOn
just tells the source where to emit each value one after the other. You need to tell the flow to go parallel via parallel
, then specify the scheduler via runOn
:
Flux.range(0, 10)
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(i -> LOG.info(i))
.sequential()
.subscribe();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With