I was wondering how to create "logical stream" using Reactor.
Lets assume that I want to implement following scenario:
As input I have object to save in database. As output I would like to get Mono representing execution message.
Option 1: if object to save has all fields filled then I perform additionall operations, save it to database and finally return "Success
" message
Option 2: if object to save has at least one field not filled I return "Error
"
I have created such code:
Mono<String> message = Mono.just(new User("Bob the Reactor master")) // user with name = can be saved
.flatMap(user -> {
if(user.getName() != null && user.getName().length() > 1){
// Perform additional operations e.g. user.setCreatedDate(new Date())
// Save to repository e.g. repository.save(user)
return Mono.just("Success!");
}
else{
return Mono.just("Error!");
}
})
.doOnNext(System.out::println); // print stream result
message.subscribe();
Is this code 100% reactive (has all its benefits)? If no then what it will look like?
Mono is more relatable to the Optional class in Java since it contains 0 or 1 value, and Flux is more relatable to List since it can have N number of values.
There are three ways that we can use onErrorResume to handle errors: Compute a dynamic fallback value. Execute an alternative path with a fallback method. Catch, wrap and re-throw an error, e.g., as a custom business exception.
A non-blocking way would be via one of the overloaded subscribe() methods. In this example, we will use the subscribe(Consumer<? super T> consumer) to get the data from Mono asynchronously. With subscribe(), the current thread will not be blocked waiting for the Publisher to emit data.
Flux. fromIterable : This is used to build a stream from collections. All collections are of the Iterable<T> type, which can be passed to this to generate the intended stream.
The answer depends on your commented repository.
Repository is non-blocking and returns Mono or Flux
You should subscribe it then return Success Mono. In your if statement:
return repository.save(user).then(Mono.just("Success!"));
Repository is blocking
You should make your repository call non-blocking moving its execution to separate thread. Reactor way is to wrap it with Mono and subscribe on elastic scheduler or your custom scheduler.
Mono<String> message = Mono.just(new User("Bob the Reactor master"))
.filter(user -> user.getName() != null && user.getName().length() > 1)
.map(user -> user) // Perform additional operations e.g. user.setCreatedDate(new Date())
.flatMap(user -> Mono.fromRunnable(() -> repository.save(user))
.subscribeOn(Schedulers.elastic())
.then(Mono.just("Success!")))
.switchIfEmpty(Mono.just("Error!"));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With