Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Project Reactor parallel execution

Project Reactor 3.1.5.RELEASE

Consider this:

Flux.range(0, 10)
    .publishOn(Schedulers.parallel())
    .subscribe(i -> LOG.info(i));

I am expecting the subscriber to run in multiple threads, but it runs only in one:

2018-03-26 12:30:08.693  INFO 89770 --- [     parallel-1] d.a.Application    : 0
2018-03-26 12:30:08.693  INFO 89770 --- [     parallel-1] d.a.Application    : 1
2018-03-26 12:30:08.693  INFO 89770 --- [     parallel-1] d.a.Application    : 2
2018-03-26 12:30:08.693  INFO 89770 --- [     parallel-1] d.a.Application    : 3
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 4
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 5
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 6
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 7
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 8
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 9

The documentation tells my expectations are correct (http://projectreactor.io/docs/core/release/reference/#threading). Could someone explain to me what's going on there?

like image 899
Mikhail Kadan Avatar asked Mar 26 '18 10:03

Mikhail Kadan


People also ask

What is parallel flux?

ParallelFlux is created from an existing Flux , using the parallel operator. By default, this splits the stream into the total number of CPU cores that are available. ParallelFlux only divides the stream, and does not change the execution model. Instead, it executes the streams on the default thread—the main thread.

What is difference between mono and flux?

Mono is more relatable to the Optional class in Java since it contains 0 or 1 value, and Flux is more relatable to List since it can have N number of values.

How do you use parallel flux?

To perform parallel execution using ParallelFlux , first you need to convert a Flux<T> into a ParallelFlux<T> . After that, you have to call runOn(Scheduler) to specify in which scheduler the elements should be processed. Without that, it cannot be processed in parallel.

What is mono and flux in Java?

A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.


1 Answers

Reactive flows are sequential in nature and publishOn just tells the source where to emit each value one after the other. You need to tell the flow to go parallel via parallel, then specify the scheduler via runOn:

Flux.range(0, 10)
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(i -> LOG.info(i))
.sequential()
.subscribe();
like image 94
akarnokd Avatar answered Oct 05 '22 04:10

akarnokd