I am working on Flux and Mono and using them in multi threaded environment and using the Schedular which provide the worker thread.
There are many options to start the Schedular using elastic, parallel and newElastic.
Here is the code which i used:
System.out.println("------ elastic --------- ");
Flux.range(1, 10)
.map(i -> i / 2)
.publishOn(Schedulers.elastic()).log()
.blockLast();
System.out.println("------ new elastic --------- ");
Flux.range(1, 10)
.map(i -> i / 2).log()
.publishOn(Schedulers.newElastic("my")).log()
.blockLast();
and both of them have the same documentation:
Scheduler that dynamically creates ExecutorService-based Workers and caches the thread pools, reusing them once the Workers have been shut down.
The maximum number of created thread pools is unbounded.
The default time-to-live for unused thread pools is 60 seconds, use the appropriate factory to push a different value.
This scheduler is not restartable.
and here is the logs for both of them:
------ elastic ---------
[ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (elastic-2) | onNext(0)
[ INFO] (elastic-2) | onNext(1)
[ INFO] (elastic-2) | onNext(1)
[ INFO] (elastic-2) | onNext(2)
[ INFO] (elastic-2) | onNext(2)
[ INFO] (elastic-2) | onNext(3)
[ INFO] (elastic-2) | onNext(3)
[ INFO] (elastic-2) | onNext(4)
[ INFO] (elastic-2) | onNext(4)
[ INFO] (elastic-2) | onNext(5)
[ INFO] (elastic-2) | onComplete()
------ new elastic ---------
[ INFO] (main) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (main) | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | request(256)
[ INFO] (main) | onNext(0)
[ INFO] (main) | onNext(1)
[ INFO] (my-4) | onNext(0)
[ INFO] (main) | onNext(1)
[ INFO] (my-4) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (my-4) | onNext(1)
[ INFO] (my-4) | onNext(2)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (my-4) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (my-4) | onNext(3)
[ INFO] (my-4) | onNext(3)
[ INFO] (main) | onNext(4)
[ INFO] (my-4) | onNext(4)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onNext(5)
[ INFO] (my-4) | onNext(4)
[ INFO] (main) | onComplete()
[ INFO] (my-4) | onNext(5)
[ INFO] (my-4) | onComplete()
What is the difference between the two?
boundedElastic() : Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) is capped. immediate() : to immediately run submitted Runnable instead of scheduling them (somewhat of a no-op or "null object" Scheduler )
Fluxion Scheduler in Flux Fluxion introduces queuing and resource matching services to extend Flux to provide advanced batch scheduling. Jobs are submitted to Flux as usual, and Fluxion makes a schedule to assign available resources to the job requests according to its configured algorithm.
The elastic()
function returns a shared scheduler instance. This means that multiple calls to this function will return the same scheduler.
The functions prefixed with new
will always create a new scheduler instance.
Check the docs for the Schedulers
class here: https://projectreactor.io/docs/core/release/api/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With