Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

flatMap() vs subscribe() in Spring webflux

I am a newbie to Spring WebFlux and trying to convert my spring MVC application to webflux. I return a Mono mono from my service :

    List<Store> stores = new ArrayList();

When I do:

    mono.subscribe(stores::addAll);
    dataexchange.put("stores", stores);
    return Mono.just(dataexchange);

Then stores is populated as empty list in response. However, I can verify that subscribe() is working after returning response.

When I do :

    return mono.flatmap( (response) -> {
        dataexchange.put("stores", response));
        return Mono.just(dataexchange);
    });

Then stores is populated in the response.

Can someone please explain me what is the difference between the two? Is flatMap blocking? Thanks in advance !

like image 753
XYZ Avatar asked Feb 12 '19 10:02

XYZ


People also ask

What is flatMap in spring WebFlux?

FlatMap. This transformation method is similar to the map function with a single exception, the transformation on each item is to return a reactive stream. The items of the streams produced from the transformation of each item in the original stream are then combined and returned as the result of the flatMap.

Does flatMap subscribe?

Once a new stream – represented by a Publisher instance – is ready, flatMap eagerly subscribes. The operator doesn't wait for the publisher to finish before moving on to the next stream, meaning the subscription is non-blocking.

How do I subscribe to WebFlux?

The subscribe method executes on the same thread as caller by default. You can use the subscribeOn to specify the Scheduler so that both the subscribe and the upstream Mono publisher executes on a dedicated thread provided by the Scheduler. To know more about the subscribeOn , refer to this article.

Can I use Springmvc and WebFlux together?

both infrastructure will compete for the same job (for example, serving static resources, the mappings, etc) mixing both runtime models within the same container is not a good idea and is likely to perform badly or just not work at all.


1 Answers

mono.subscribe(stores::addAll);

is asynchronous. That means, you tell the mono that it can now start evaluating.

What you do is continue processing stores right away - with a huge chance that the Mono hasn't evaluated yet.

So, how can you fix this?

You can block until the Mono has completed:

mono.doOnNext(stores::addAll).block()

Of course, this defeats the purpose of reactive programming. You are blocking the main thread until an action completes, which can be achieved without Reactor in a much simpler fashion.


The right way is to change the rest of your code to be reactive too, from head to toe. This is similar to your second example, where your call to dataexchange is part of the Mono, thus being evaluated asynchronously, too.

The important lesson to be learned is that operations like map or flatMap are not operating on the result of the Mono, but create a new Mono that adds another transformation to the execution of the original Mono. As long as the Mono doesn't evaluate, the flatMap or map operations are not actually doing anything.


I hope this helps.

like image 199
Markus Appel Avatar answered Sep 28 '22 02:09

Markus Appel