I have the below async tasks:
public class AsyncValidationTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
public Mono<Void> execute(Object o);
}
public class AsyncSaveTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono of Object
public Mono<Object> execute(Object o);
}
And below Service class:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
// Right now, the problem is that when validation completes successfully, it
// emits Mono.empty hence the flatMap chained below will not be invoked at all.
.flatMap(dontcare -> this.save.execute(o))
}
}
As shown above, I tried to use flatMap
to chain the AsyncSaveTask.execute
call if the AsyncValidationTask.execute
completes successfully, it won't work because nothing is emitted (Mono.empty) upon completion.
I also consider then
to chain the second call, but it will always invoke the chained call regardless of the Mono.error produced by the first validation call.
How can I chain them properly?
Appendix. In order to dig deeper into the core features of Reactor, head to Reactor Core Features to learn: More about Reactor's reactive types in the Flux , an Asynchronous Sequence of 0-N Items and Mono , an Asynchronous 0-1 Result sections.
You can retrieve the result from Mono in a non-blocking way by subscribing a Consumer that will consume all the sequences. The Consumer code block executes asynchronously only after Mono completes. For example, the following subscribe method prints the result of Mono to the console.
The Mono<Void> is a special case where Void is a class that you can never instantiate. Since there's no such thing as an instance of Void , the Mono can never emit a value before its completion signal. private static Mono<Void> monoVoid() { System.out.println("monoVoid called"); return Mono.just("Hello").then(); }
Mono vs Flux in Reactive Stream A Flux object represents a reactive sequence of 0.. N items, whereas a Mono object represents a single value or an empty (0..1) result. Most times, you expect exactly one result or no (zero) result, and not a collection that contains possibly multiple results.
.then
for terminal only sources to chainUse .then
, in order to chain your execution with the process, which only sends a terminal signal.
Also, pay attention, if you need to do something on the error signal, then you have to accompany your .then
with onErrorResume
beforehand.
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(this.save.execute(o))
}
}
.defer
in order to postpone mono creationIn order to execute the this.save.execute(o)
only in case of successful validation, you have to wrap it in Mono.defer
as well:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(Mono.defer(() -> this.save.execute(o)))
}
}
Usually, it is not necessary, because Mono
is a LAZY type which SHOULD start doing work only in case subscription happened (subscription == .subscribe()
).
The implementation of Mono#then
guarantees that subscription to Mono
returned by the this.save.execute
the method starts RIGHT AFTER the Mono.defer(() -> this.validation.execute(o))
completed.
The only reason why execution may start earlier may be a PURPOSE (e.g., business logic which provides such behavior on purpose - caching; hot source, etc.) OR an INCORRECT implementation of the this.save.execute(o)
which starts doing work regardless of actual subscription.
In general, it is a good practice to ensure that API which does work and expose that work as a Publisher
(e.g. Mono
| Flux
) is Lazy.
It means that the API creator MUST ensure that the work execution happens only in case the user has subscribed to the given Publisher
instance.
For example, if your async API does CompletableFuture
creation underneath, it worth to manually wrap your CompletableFuture
creation into Mono.defer
or to use proper method extension, e.g Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)
Let's consider how to make a regular ThreadPool task submission Reactive.
interface Executor {
Future<T> execute(Callable<T> runnable);
}
So, in order to make Executor
reactive, we have to create something like the following:
interface ReactiveExecutor {
Mono<T> execute(Callable<T> runnable);
}
The following is a possible implementation of such an adapter which works:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
}
}
Definitely, such an implementation will be working. However, it has a few critical issues:
Publisher
)Mono
, which supports later subscription. .subscribe
method happened (then we got value leak which impossible to handle)Mono execute(..)
with Mono.defer
outside of the implementation (see the original problem in the question). Subsequently, it leads to the fact that an API user can easily 'shoot your self in the foot' forgetting to wrap execution with an extra .defer
Basically, it is enough to move the Mono.defer
into the library internals. It will make the life of the API users much easier since they don't have to think when it is necessary to use deferring (hence - less possible issues).
For example, the simplest solution for our Reactive Executor can be the following:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.defer(() -> {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
})
}
}
By just deferring the execution, we can solve at least one problem for sure - ensure value is not leaked anymore.
However, in order to solve all the possible problems in that particular case, we may use Mono.create
which is properly designed for adopting async API:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.crete(monoSink -> {
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
monoSink.complete(value);
});
monoSink.doOnCancel(task::cancel);
})
}
}
using Mono.create
we have a guarantee of lazy execution on every subscriber.
Also, using MonoSink
API, we can quickly hook on all the essential signals from the subscriber.
Finally, Mono.create ensures that in case of anything, the value will be discarded appropriately.
Finally, having such an API it is not necessary to use defer for all the cases
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With