I have a use case when I should send email to the users. First I create email body.
Mono<String> emailBody = ...cache();
And then I select users and send the email to them:
Flux.fromIterable(userRepository.findAllByRole(Role.USER))
.map(User::getEmail)
.doOnNext(email -> sendEmail(email, emailBody.block(), massSendingSubject))
.subscribe();
What I don't like
Instead of take(1) , you could use next() . This will transform the Flux into a valued Mono by taking the first emitted item, or an empty Mono if the Flux is empty itself.
Spring WebFlux internally uses Project Reactor and its publisher implementations, Flux and Mono. Mono — A publisher that can emit 0 or 1 element. Flux — A publisher that can emit 0.. N elements.
A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.
There are several issues in this code sample. I'll assume that this is a reactive web application.
First, it's not clear how you are creating the email body; are you fetching things from a database or a remote service? If it is mostly CPU bound (and not I/O), then you don't need to wrap that into a reactive type. Now if it should be wrapper in a Publisher
and the email content is the same for all users, using the cache
operator is not a bad choice.
Also, Flux.fromIterable(userRepository.findAllByRole(Role.USER))
suggest that you're calling a blocking repository from a reactive context.
You should never do heavy I/O operations in a doOn***
operator. Those are made for logging or light side-effects operations. The fact that you need to .block()
on it is another clue that you'll block your whole reactive pipeline.
Last one: you should not call subscribe
anywhere in a web application. If this is bound to an HTTP request, you're basically triggering the reactive pipeline with no guarantee about resources or completion. Calling subscribe
triggers the pipeline but does not wait until it's complete (this method returns a Disposable
).
A more "typical" sample of that would look like:
Flux<User> users = userRepository.findAllByRole(Role.USER);
String emailBody = emailContentGenerator.createEmail();
// sendEmail() should return Mono<Void> to signal when the send operation is done
Mono<Void> sendEmailsOperation = users
.flatMap(user -> sendEmail(user.getEmail(), emailBody, subject))
.then();
// something else should subscribe to that reactive type,
// you could plug that as a return value of a Controller for example
If you're somehow stuck with blocking components (the sendEmail
one, for example), you should schedule those blocking operations on a specific scheduler to avoid blocking your whole reactive pipeline. For that, look at the Schedulers section on the reactor reference documentation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With