Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

what does Mono.defer() do?

I've come across Mono.defer() in some Spring webflux code

I looked up the method in the docs but don't understand the explanation:

"Create a Mono provider that will supply a target Mono to subscribe to for each Subscriber downstream"

please could I have an explanation and an example. Is there a place with a bunch of Reactor example code (their unit tests?) that I might reference.

thanks

like image 852
James Render Avatar asked May 02 '19 15:05

James Render


People also ask

What is the use of mono defer?

when you run Mono. just() it creates immediately an Observable(Mono)and reuses it but when you use defer it doesn't create it immediately it creates a new Observable in every subscribe.

What is mono in spring reactive programming?

A Mono<T> is a Reactive Streams Publisher , also augmented with a lot of operators that can be used to generate, transform, orchestrate Mono sequences. It is a specialization of Flux that can emit at most 1 <T> element: a Mono is either valued (complete with element), empty (complete without element) or failed (error).

What is the difference between flux and Mono?

A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.

What is mono in spring Webflux?

Mono — A publisher that can emit 0 or 1 element. Flux — A publisher that can emit 0.. N elements.


3 Answers

It is a bit of an oversimplification but conceptually Reactor sources are either lazy or eager. More advanced ones, like an HTTP request, are expected to be lazily evaluated. On the other side the most simple ones like Mono.just or Flux.fromIterable are eager.

By that, I mean that calling Mono.just(System.currentTimeMillis()) will immediately invoke the currentTimeMillis() method and capture the result. Said result is only emitted by the Mono once it is subscribed to. Subscribing multiple times doesn't change the value either:

Mono<Long> clock = Mono.just(System.currentTimeMillis());
//time == t0

Thread.sleep(10_000);
//time == t10
clock.block(); //we use block for demonstration purposes, returns t0

Thread.sleep(7_000);
//time == t17
clock.block(); //we re-subscribe to clock, still returns t0

The defer operator is there to make this source lazy, re-evaluating the content of the lambda each time there is a new subscriber:

Mono<Long> clock = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
//time == t0

Thread.sleep(10_000);
//time == t10
clock.block(); //invoked currentTimeMillis() here and returns t10

Thread.sleep(7_000);
//time == t17
clock.block(); //invoke currentTimeMillis() once again here and returns t17
like image 60
Simon Baslé Avatar answered Oct 09 '22 14:10

Simon Baslé


with simple words if you see in the first view it is like Mono.just() but is not. when you run Mono.just() it creates immediately an Observable(Mono)and reuses it but when you use defer it doesn't create it immediately it creates a new Observable in every subscribe.

One use case to see the difference

    int a = 5;
@Override
public void run(String... args) throws Exception {

    Mono<Integer> monoJust = Mono.just(a);
    Mono<Integer> monoDefer = Mono.defer(() -> Mono.just(a));

    monoJust.subscribe(integer1 -> System.out.println(integer1));
    monoDefer.subscribe(integer1 -> System.out.println(integer1));

    a = 7;
    monoJust.subscribe(integer1 -> System.out.println(integer1));
    monoDefer.subscribe(integer1 -> System.out.println(integer1));
}

print:

5
5
5
7

if you see mono.just has created the observable immediately and it doesn't change even if the value has changed but the defer create the observable in subscribe so you will work with the current onSubscribe value

like image 30
Ricard Kollcaku Avatar answered Oct 09 '22 15:10

Ricard Kollcaku


I was attempting defer for a different use case. Wrote the below code to check and sharing as it might help others. My use case was to chain two Monos and ensure that the first one is complete before the second one is taken up. And second one contained a blocking call whose result is used to respond a Mono either with empty or error response. Without defer, my blocking call is executed irrespective of the result of first one. But while using defer the blocking call is executed only when the first Mono completes. Code below:

public static void main(String[] args) {
    long cur = System.currentTimeMillis();
    boolean succeed = true;

    Mono<Integer> monoJust = Mono.create(consumer -> {
        System.out.println("MonoJust inside " + (System.currentTimeMillis() - cur));
        if (succeed) {
            consumer.success(1);
        } else {
            consumer.error(new RuntimeException("aaa"));
        }
    });

    Mono<String> monoJustStr = Mono.create(consumer -> {
        System.out.println("MonoJustStr inside: " + (System.currentTimeMillis() - cur));
        consumer.success("one");
    });

    System.out.println("##1##: Begin");
    monoJust.then(evaluator() ? Mono.empty() : monoJustStr).subscribe(d -> System.out.println("##1##: "+d), e-> System.err.println(e));
    System.out.println("##1##: Done: "+(System.currentTimeMillis() - cur));

    System.out.println("\n\n\n##2##: Begin");
    monoJust.then(Mono.defer(() -> evaluator() ? Mono.empty() : monoJustStr)).subscribe(d -> System.out.println("##2##: "+d), e-> System.err.println(e));
    System.out.println("##2##: Done: " + (System.currentTimeMillis() - cur));

}

private static boolean evaluator() {
    System.out.println("Inside Evaluator");
    return false;
}

Output with succeed=true - Observe the sequence of "Inside Evaluator" and "MonoJust inside"

##1##: Begin
Inside Evaluator
MonoJust inside 540
MonoJustStr inside: 542
##1##: one
##1##: Done: 542



##2##: Begin
MonoJust inside 544
Inside Evaluator
MonoJustStr inside: 544
##2##: one
##2##: Done: 544

Below is output with succeed = false - Note that evaluator is not called.

##1##: Begin
Inside Evaluator
MonoJust inside 565
java.lang.RuntimeException: aaa
##1##: Done: 567



##2##: Begin
MonoJust inside 569
java.lang.RuntimeException: aaa
##2##: Done: 569
like image 3
Pavan Kumar Avatar answered Oct 09 '22 15:10

Pavan Kumar