I have streams of incoming events that need to be enriched, and then processed in parallel as they arrive.
I was thinking Project Reactor was made to order for the job, but in my tests all of the processing seems to be done serially.
Here is some test code:
ExecutorService executor = Executors.newFixedThreadPool(10);
System.out.println("Main thread: " + Thread.currentThread());
Flux<String> tick = Flux.interval(Duration.of(10, ChronoUnit.MILLIS))
.map(i-> {
System.out.println("ReactorTests.test " + Thread.currentThread());
sleep(1000L); // simulate IO delay
return String.format("String %d", i);
})
.take(3)
// .subscribeOn(Schedulers.elastic());
// .subscribeOn(Schedulers.newParallel("test"));
// .subscribeOn(Schedulers.fromExecutor(executor));
;
tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread()),
System.out::println,
()-> System.out.println("Done"));
System.out.println("DONE AND DONE");
I have tried uncommenting each of the commented lines, however in every case the output indicates that the same thread is used to process all of the events
Main thread: Thread[main,5,main]
[DEBUG] (main) Using Console logging
DONE AND DONE
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
Done
(The only difference is that without the Schedulers, they are run on the subscribe thread, whereas with any of the executors, they all run in the same thread, which is not the subscribe thread.)
What am I missing?
FYI, there is a "sleep" method:
public static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
System.out.println("Exiting");
}
}
One way to handle items in parallel, is to use .parallel
/ .runOn
flux
.parallel(10)
.runOn(scheduler)
//
// Work to be performed in parallel goes here. (e.g. .map, .flatMap, etc)
//
// Then, if/when you're ready to go back to sequential, call .sequential()
.sequential()
Blocking operations (such as blocking IO, or Thread.sleep
) will block the thread on which they are executed. Reactive streams cannot magically turn a blocking method into a non-blocking method. Therefore, you need to ensure blocking methods are run on a Scheduler
suitable for blocking operations (e.g. Schedulers.boundedElastic()
).
In the example above, since you know you are calling a blocking operation, you could use .runOn(Schedulers.boundedElastic())
.
Depending on the use case, you can also use async operators like .flatMap
in combination with .subscribeOn
or .publishOn
to delegate specific blocking operations to another Scheduler
, as described in the project reactor docs. For example:
flux
.flatMap(i -> Mono.fromCallable(() -> {
System.out.println("ReactorTests.test " + Thread.currentThread());
sleep(1000L); // simulate IO delay
return String.format("String %d", i);
})
.subscribeOn(Schedulers.boundedElastic()))
In fact, .flatMap
also has an overloaded variant that takes a concurrency
parameter where you can limit the maximum number of in-flight inner sequences. This can be used instead of .parallel
in some use cases. It will not generally work for Flux.interval
though, since Flux.interval
doesn't support downstream requests that replenish slower than the ticks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With