I'm new to Project reactor and trying to understand difference between boundedElastic()
vs parallel()
scheduler. Documentation says that boundedElastic()
is used for blocking tasks and parallel()
for non-blocking tasks.
Why do Project reactor need to address blocking scenario as they are non-blocking in nature. Can someone please help me out with some real world use case for boundedElastic()
vs parallel()
scheduler
?
The parallel flavor is backed by N workers (according to the N cpus) each based on a ScheduledExecutorService. If you submit N long lived tasks to it, no more work can be executed, hence the affinity for short-lived tasks.
The elastic flavor is also backed by workers based on ScheduledExecutorService, except it creates these workers on demand and pools them. BoundedElastic is same as elastic, difference is that you can limit the total no. of threads.
https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers
TL;DR
Reactor executes non-blocking/async tasks on a small number of threads. In case task is blocking - thread would be blocked and all other tasks would be waiting for it.
parallel
should be used for fast non-blocking operation (default option)boundedElastic
should be used to "offload" blocking tasksIn general Reactor API is concurrency-agnostic that use Schedulers
abstraction to execute tasks. Schedulers
have responsibilities very similar to ExecutorService
.
Schedulers.parallel()
Should be a default option and used for fast non-blocking operation on a small number of threads. By default, number of threads is equal to number of CPU cores. It could be controlled by reactor.schedulers.defaultPoolSize
system property.
Schedulers.boundedElastic()
Used to execute longer operations (blocking tasks) as a part of the reactive flow. It will use thread pool with a default number of threads number of CPU cores x 10 (could be controlled by reactor.schedulers.defaultBoundedElasticSize
) and default queue size of 100000 per thread (reactor.schedulers.defaultBoundedElasticSize
).
subscribeOn
or publishOn
could be used to change the scheduler.
The following code shows how to wrap blocking operation
Mono.fromCallable(() -> {
// blocking operation
}).subscribeOn(Schedulers.boundedElastic()); // run on a separate scheduler because code is blocking
Schedulers.newBoundedElastic()
Similar to Schedulers.boundedElastic()
but is useful when you need to create a separate thread pool for some operation.
Sometimes it's not obvious what code is blocking. One very useful tool while testing reactive code is BlockHound
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With