Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Difference between boundedElastic() vs parallel() scheduler

I'm new to Project reactor and trying to understand difference between boundedElastic() vs parallel() scheduler. Documentation says that boundedElastic() is used for blocking tasks and parallel() for non-blocking tasks.

Why do Project reactor need to address blocking scenario as they are non-blocking in nature. Can someone please help me out with some real world use case for boundedElastic() vs parallel() scheduler ?

like image 450
MNK Avatar asked Apr 19 '20 13:04

MNK


2 Answers

The parallel flavor is backed by N workers (according to the N cpus) each based on a ScheduledExecutorService. If you submit N long lived tasks to it, no more work can be executed, hence the affinity for short-lived tasks.

The elastic flavor is also backed by workers based on ScheduledExecutorService, except it creates these workers on demand and pools them. BoundedElastic is same as elastic, difference is that you can limit the total no. of threads.

https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers

like image 147
Hardik Garg Avatar answered Sep 20 '22 22:09

Hardik Garg


TL;DR

Reactor executes non-blocking/async tasks on a small number of threads. In case task is blocking - thread would be blocked and all other tasks would be waiting for it.

  • parallel should be used for fast non-blocking operation (default option)
  • boundedElastic should be used to "offload" blocking tasks

In general Reactor API is concurrency-agnostic that use Schedulers abstraction to execute tasks. Schedulers have responsibilities very similar to ExecutorService.

Schedulers.parallel()

Should be a default option and used for fast non-blocking operation on a small number of threads. By default, number of threads is equal to number of CPU cores. It could be controlled by reactor.schedulers.defaultPoolSize system property.

Schedulers.boundedElastic()

Used to execute longer operations (blocking tasks) as a part of the reactive flow. It will use thread pool with a default number of threads number of CPU cores x 10 (could be controlled by reactor.schedulers.defaultBoundedElasticSize) and default queue size of 100000 per thread (reactor.schedulers.defaultBoundedElasticSize).

subscribeOn or publishOn could be used to change the scheduler.

The following code shows how to wrap blocking operation

Mono.fromCallable(() -> {
    // blocking operation
}).subscribeOn(Schedulers.boundedElastic()); // run on a separate scheduler because code is blocking

Schedulers.newBoundedElastic() Similar to Schedulers.boundedElastic() but is useful when you need to create a separate thread pool for some operation.

Sometimes it's not obvious what code is blocking. One very useful tool while testing reactive code is BlockHound

like image 39
Alex Avatar answered Sep 20 '22 22:09

Alex