I have this confusion, when does doOnNext is triggered before or after of element emission by Publisher (Flux/Mono).
You can use hasElements method of Flux to check whether Flux completes empty. It emits a single boolean true if this Flux sequence has at least one element.
Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.
Flux. interval(Duration) produces a Flux that is infinite and emits regular ticks from a clock. So your first example has no concurrency, everything happens on the same thread. The subscription and all events must complete before the method and process ends. By using Flux.
A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.
It's after the publication of the element - and it has to be after by definition, otherwise the Consumer
passed to doOnNext
wouldn't have access to the element emitted.
However, doOnNext()
is called before the subscriber. For example:
Flux.just("first", "second")
.doOnNext(x -> System.out.println(x + " onNext"))
.subscribe(System.out::println);
...would output:
first onNext
first
second onNext
second
I am afraid you haven't understood reactive properly, and that's perfectly fine :). In Rx, the subscription goes bottom-up. Consider the following code:
Flux<Integer> f = Flux.fromIterable(Arrays.asList(1,2,3));
Since there is no subscriber, nothing happens. No item would be emitted. Now consider this:
f.subscribe(System.out::println)
What happens is that the subscriber subscribes to the flux, and items are emitted.
Now consider this:
f.doOnNext(System.out::println).subscribe(System.out.println);
Remember, the subscription happens in a bottom-up manner. So over here, the subscriber subscribes to the doOnNext()
, and the doOnNext()
subscribes to the original flux, which then starts emitting events. doOnNext()
then intercepts each event and performs some side-effect.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With