Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Project Reactor conditional execution

I have an object to save (to MongoDB), but before it I need to check if some conditions are true.

Object contains IDs to other objects. It looks like

"object": {
   "id": "123",
   "subobject1": { "id": "1" },
   "subobject2": { "id": "2" }
}

Subobjects contain only id, other info is located in other collection, so I have to check is the information exist.

In block-style I can do something like

    if (!languageRepository.exists(Example.of(wordSet.getNativeLanguage())).block()) {
        throw new RuntimeException("Native language doesn't exist");
    }

    if (!languageRepository.exists(Example.of(wordSet.getTargetLanguage())).block()) {
        throw new RuntimeException("Target language doesn't exist");
    }

and only then I can save my object

return wordSetRepository.save(wordSet);

How can I do it in "reactive" style without blocking?

like image 821
Вадим Парафенюк Avatar asked Apr 16 '18 15:04

Вадим Парафенюк


People also ask

Do something if Mono is empty?

Default value if mono is empty. If you want to provide a default value when the mono is completed without any data, then use defaultIfEmpty method. For example, the following code tries to fetch the customer data from the database by using the customer id .

What is difference between mono and flux?

A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.

What is mono just?

The Mono. just method is the simplest method for Mono generation. It takes a single value and generates a finite Mono stream from it. A completion event is published after publishing the specified value: Mono.

How does a project reactor work?

Project Reactor is a direct implementation of the Reactive Streams Specification. The main feature of Reactive Streams Specification is that it provides a medium of communication between the stream producer and stream consumer so that a consumer can demand the stream according to its processing capabilities.


Video Answer


1 Answers

If you want to propagate distinct errors for the native vs target language error cases, you'll need to perform async filtering inside a flatMap:

objectFlux.flatMap(o ->
    Mono.just(o)
        .filterWhen(languageRepository.exists(...)) //native
        .switchIfEmpty(Mono.error(new RuntimeException("Native language doesn't exist"))
        .filterWhen(languageRepository.exists(...)) //target
        .switchIfEmpty(Mono.error(new RuntimeException("Target language doesn't exist"))
    )
    .flatMap(wordSetRepository::save);

The async filtering inside the flatMap ensures that if the test doesn't pass, the inner sequence is empty. This in turn allows us to detect the case and propagate the adequate error. If both tests pass, the original o is propagated in the main sequence.

The second flatMap takes it from there, only receiving the elements that passed both filters and saving them in DB.

Note that the first element to not pass the filters will interrupt the whole sequence (but it was the same in the blocking code since an exception was thrown).

like image 81
Simon Baslé Avatar answered Sep 20 '22 22:09

Simon Baslé