Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java reactor - chain Mono<Void> with another async task producing Mono<Object>

I have the below async tasks:

public class AsyncValidationTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
    public Mono<Void> execute(Object o);
}
public class AsyncSaveTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono of Object
    public Mono<Object> execute(Object o);
}

And below Service class:

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   // Right now, the problem is that when validation completes successfully, it 
                   // emits Mono.empty hence the flatMap chained below will not be invoked at all.
                   .flatMap(dontcare -> this.save.execute(o))
    }
}

As shown above, I tried to use flatMap to chain the AsyncSaveTask.execute call if the AsyncValidationTask.execute completes successfully, it won't work because nothing is emitted (Mono.empty) upon completion.

I also consider then to chain the second call, but it will always invoke the chained call regardless of the Mono.error produced by the first validation call.

How can I chain them properly?

like image 233
GJ. Avatar asked Mar 24 '20 17:03

GJ.


People also ask

Is Mono asynchronous?

Appendix. In order to dig deeper into the core features of Reactor, head to Reactor Core Features to learn: More about Reactor's reactive types in the Flux , an Asynchronous Sequence of 0-N Items and Mono , an Asynchronous 0-1 Result sections.

How do you get an object from mono without blocking?

You can retrieve the result from Mono in a non-blocking way by subscribing a Consumer that will consume all the sequences. The Consumer code block executes asynchronously only after Mono completes. For example, the following subscribe method prints the result of Mono to the console.

What is mono void in Java?

The Mono<Void> is a special case where Void is a class that you can never instantiate. Since there's no such thing as an instance of Void , the Mono can never emit a value before its completion signal. private static Mono<Void> monoVoid() { System.out.println("monoVoid called"); return Mono.just("Hello").then(); }

What is the difference between flux and Mono?

Mono vs Flux in Reactive Stream A Flux object represents a reactive sequence of 0.. N items, whereas a Mono object represents a single value or an empty (0..1) result. Most times, you expect exactly one result or no (zero) result, and not a collection that contains possibly multiple results.


1 Answers

.then for terminal only sources to chain

Use .then, in order to chain your execution with the process, which only sends a terminal signal.

Also, pay attention, if you need to do something on the error signal, then you have to accompany your .then with onErrorResume beforehand.

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   .onErrorResume(t -> ...) // should be before then
                   .then(this.save.execute(o))
    }
}

.defer in order to postpone mono creation

In order to execute the this.save.execute(o) only in case of successful validation, you have to wrap it in Mono.defer as well:

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   .onErrorResume(t -> ...) // should be before then
                   .then(Mono.defer(() -> this.save.execute(o)))
    }
}

Usually, it is not necessary, because Mono is a LAZY type which SHOULD start doing work only in case subscription happened (subscription == .subscribe()).

The implementation of Mono#then guarantees that subscription to Mono returned by the this.save.execute the method starts RIGHT AFTER the Mono.defer(() -> this.validation.execute(o)) completed.

The only reason why execution may start earlier may be a PURPOSE (e.g., business logic which provides such behavior on purpose - caching; hot source, etc.) OR an INCORRECT implementation of the this.save.execute(o) which starts doing work regardless of actual subscription.

Design your Implementation properly

In general, it is a good practice to ensure that API which does work and expose that work as a Publisher (e.g. Mono | Flux) is Lazy.

It means that the API creator MUST ensure that the work execution happens only in case the user has subscribed to the given Publisher instance.

For example, if your async API does CompletableFuture creation underneath, it worth to manually wrap your CompletableFuture creation into Mono.defer or to use proper method extension, e.g Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)

Executors example

Let's consider how to make a regular ThreadPool task submission Reactive.

interface Executor  {
  Future<T> execute(Callable<T> runnable); 
}

So, in order to make Executor reactive, we have to create something like the following:

interface ReactiveExecutor {
   Mono<T> execute(Callable<T> runnable);
}

Incorrect Implementation

The following is a possible implementation of such an adapter which works:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      MonoProcessor<T> result = MonoProcessor.create();
      Future<T> task = delegate.execute(() -> {
          T value = runnable.call();
          result.onNext(value);
          result.onComplet();
          return value;
      });

      return result.doOnCancel(() -> task.cancel());
   }
}

Definitely, such an implementation will be working. However, it has a few critical issues:

  1. Execution starts on the method invocation (which somewhat contradicts to lazy behavior of reactive streams Publisher)
  2. Since execution starts before the actual task subscription, we have to create a stateful Mono, which supports later subscription.
  3. This implementation does not handle the case when there are no Subscribers at all (e.g., execution has started, but no .subscribe method happened (then we got value leak which impossible to handle)
  4. It is too hacky in general to be a solution. Also, in order to prevent all the previously mentioned cases, it is necessary to wrap every call on Mono execute(..) with Mono.defer outside of the implementation (see the original problem in the question). Subsequently, it leads to the fact that an API user can easily 'shoot your self in the foot' forgetting to wrap execution with an extra .defer

So, how to solve it, then?

Basically, it is enough to move the Mono.defer into the library internals. It will make the life of the API users much easier since they don't have to think when it is necessary to use deferring (hence - less possible issues).

For example, the simplest solution for our Reactive Executor can be the following:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      Mono.defer(() -> {
          MonoProcessor<T> result = MonoProcessor.create();
          Future<T> task = delegate.execute(() -> {
              T value = runnable.call();
              result.onNext(value);
              result.onComplet();
              return value;
          });

          return result.doOnCancel(() -> task.cancel());
     })
   }
}

By just deferring the execution, we can solve at least one problem for sure - ensure value is not leaked anymore.

But, How to solve it correctly?

However, in order to solve all the possible problems in that particular case, we may use Mono.create which is properly designed for adopting async API:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      Mono.crete(monoSink -> {

          Future<T> task = delegate.execute(() -> {
              T value = runnable.call();
              monoSink.complete(value);
          });

          monoSink.doOnCancel(task::cancel);
     })
   }
}

using Mono.create we have a guarantee of lazy execution on every subscriber. Also, using MonoSink API, we can quickly hook on all the essential signals from the subscriber. Finally, Mono.create ensures that in case of anything, the value will be discarded appropriately.

Finally, having such an API it is not necessary to use defer for all the cases

like image 144
Oleh Dokuka Avatar answered Sep 26 '22 10:09

Oleh Dokuka