I am using publishOn vs subscribeOn both on the same flux as follows:
System.out.println("*********Calling Concurrency************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.map(i -> i * 2)
.log()
.publishOn(Schedulers.elastic())
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
System.out.println("-------------------------------------");
Although, when i use both, nothing is printed in logs. But when i use only publishOn, i got the following info logs:
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Is that publishOn is more recommended than subscribeOn? Or it has more preference than subscribeOn? What is the difference between the two and when to use which?
A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.
Mono subscribe() The subscribe() method with no arguments subscribes to the Mono and requests for the data from the publisher. It does not consume the data and also has no error handling mechanism.
A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error). It is intended to be used in implementations and return types. Input parameters should keep using raw Publisher as much as possible.
boundedElastic() : Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) is capped. immediate() : to immediately run submitted Runnable instead of scheduling them (somewhat of a no-op or "null object" Scheduler )
It took me sometime to understand it, maybe because publishOn
is usually explained before subscribeOn
, here's a hopefully more simple layman explanation.
subscribeOn
means running the initial source emission e.g subscribe(), onSubscribe() and request()
on a specified scheduler worker (other thread), and also the same for any subsequent operations like for example onNext/onError/onComplete, map etc
and no matter the position of subscribeOn(), this behavior would happen
And if you didn't do any publishOn
in the fluent calls then that's it, everything would run on such thread.
But as soon as you call publishOn()
let's say in the middle, then any subsequent operator call will be run on the supplied scheduler worker to such publishOn()
.
here's an example
Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());
Flux.range(1, 5)
.doOnNext(consumer)
.map(i -> {
System.out.println("Inside map the thread is " + Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
.doOnNext(consumer)
.publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
.doOnNext(consumer)
.subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
.subscribe();
The result would be
1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5
As you can see the first doOnNext()
and the following map()
is running on the thread called subscribeOn_thread
, that happens till any publishOn()
called, then any subsequent call would run on the supplied scheduler to that publishOn()
and again this will happen for any subsequent call till anyone calls another publishOn()
.
Here is a small documentation which i got:
publishOn applies in the same way as any other operator, in the middle of the subscriber chain. It takes signals from downstream and replays them upstream while executing the callback on a worker from the associated Scheduler. Consequently, it affects where the subsequent operators will execute (until another publishOn is chained in).
subscribeOn applies to the subscription process, when that backward chain is constructed. As a consequence, no matter where you place the subscribeOn in the chain, it always affects the context of the source emission. However, this does not affect the behavior of subsequent calls to publishOn. They still switch the execution context for the part of the chain after them.
and
publishOn forces the next operator (and possibly subsequent operators after the next one) to run on a different thread. Similarly, subscribeOn forces the previous operator (and possibly operators prior to the previous one) to run on a different thread.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With