Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java Reactor Flux/Mono, when does doOnNext get triggered before or after element is emitted?

I have this confusion, when does doOnNext is triggered before or after of element emission by Publisher (Flux/Mono).

like image 943
dvsakgec Avatar asked Jul 25 '19 14:07

dvsakgec


People also ask

How do you know if flux is not empty?

You can use hasElements method of Flux to check whether Flux completes empty. It emits a single boolean true if this Flux sequence has at least one element.

What is reactive flux?

Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.

What is flux interval?

Flux. interval(Duration) produces a Flux that is infinite and emits regular ticks from a clock. So your first example has no concurrency, everything happens on the same thread. The subscription and all events must complete before the method and process ends. By using Flux.

What is mono and flux in Java?

A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.


2 Answers

It's after the publication of the element - and it has to be after by definition, otherwise the Consumer passed to doOnNext wouldn't have access to the element emitted.

However, doOnNext() is called before the subscriber. For example:

Flux.just("first", "second")
        .doOnNext(x -> System.out.println(x + " onNext"))
        .subscribe(System.out::println);

...would output:

first onNext
first
second onNext
second
like image 147
Michael Berry Avatar answered Sep 30 '22 15:09

Michael Berry


I am afraid you haven't understood reactive properly, and that's perfectly fine :). In Rx, the subscription goes bottom-up. Consider the following code:

Flux<Integer> f = Flux.fromIterable(Arrays.asList(1,2,3));

Since there is no subscriber, nothing happens. No item would be emitted. Now consider this:

f.subscribe(System.out::println)

What happens is that the subscriber subscribes to the flux, and items are emitted.

Now consider this:

f.doOnNext(System.out::println).subscribe(System.out.println);

Remember, the subscription happens in a bottom-up manner. So over here, the subscriber subscribes to the doOnNext(), and the doOnNext() subscribes to the original flux, which then starts emitting events. doOnNext() then intercepts each event and performs some side-effect.

like image 29
Prashant Pandey Avatar answered Sep 30 '22 15:09

Prashant Pandey