Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java Reactor - conditional stream execution

I was wondering how to create "logical stream" using Reactor.

Lets assume that I want to implement following scenario:

As input I have object to save in database. As output I would like to get Mono representing execution message.

  • Option 1: if object to save has all fields filled then I perform additionall operations, save it to database and finally return "Success" message

  • Option 2: if object to save has at least one field not filled I return "Error"

I have created such code:

Mono<String> message = Mono.just(new User("Bob the Reactor master")) // user with name = can be saved
    .flatMap(user -> {
        if(user.getName() != null && user.getName().length() > 1){
            // Perform additional operations e.g. user.setCreatedDate(new Date())
            // Save to repository e.g. repository.save(user)
            return Mono.just("Success!");
        }
        else{
            return Mono.just("Error!");
        }
    })
    .doOnNext(System.out::println); // print stream result

message.subscribe();

Is this code 100% reactive (has all its benefits)? If no then what it will look like?

like image 894
Wicia Avatar asked Oct 11 '18 20:10

Wicia


People also ask

What is the difference between mono and flux Java?

Mono is more relatable to the Optional class in Java since it contains 0 or 1 value, and Flux is more relatable to List since it can have N number of values.

How do you use onErrorResume?

There are three ways that we can use onErrorResume to handle errors: Compute a dynamic fallback value. Execute an alternative path with a fallback method. Catch, wrap and re-throw an error, e.g., as a custom business exception.

How do you get an object from mono without blocking?

A non-blocking way would be via one of the overloaded subscribe() methods. In this example, we will use the subscribe(Consumer<? super T> consumer) to get the data from Mono asynchronously. With subscribe(), the current thread will not be blocked waiting for the Publisher to emit data.

What does flux fromIterable do?

Flux. fromIterable : This is used to build a stream from collections. All collections are of the Iterable<T> type, which can be passed to this to generate the intended stream.


1 Answers

The answer depends on your commented repository.

  • Repository is non-blocking and returns Mono or Flux

    You should subscribe it then return Success Mono. In your if statement:

    return repository.save(user).then(Mono.just("Success!"));
    
  • Repository is blocking

    You should make your repository call non-blocking moving its execution to separate thread. Reactor way is to wrap it with Mono and subscribe on elastic scheduler or your custom scheduler.

    Mono<String> message = Mono.just(new User("Bob the Reactor master"))
            .filter(user -> user.getName() != null && user.getName().length() > 1)
            .map(user -> user) // Perform additional operations e.g. user.setCreatedDate(new Date())
            .flatMap(user -> Mono.fromRunnable(() -> repository.save(user))
                    .subscribeOn(Schedulers.elastic())
                    .then(Mono.just("Success!")))
            .switchIfEmpty(Mono.just("Error!"));
    
like image 116
Alexander Pankin Avatar answered Oct 18 '22 23:10

Alexander Pankin