Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Reactor: Mono.zip fails on empty Mono

I am using Spring Reactor 3.1.0.M3 and have a use case where I need to merge Mono's from multiple sources. I found that if one of the Monos is an empty Mono, zip fails without an error.

Example:

Mono<String> m1 = Mono.just("A");
Mono<String> m2 = Mono.just("B");
Mono<String> m3 = Mono.empty();

Mono<String> combined = Mono.zip(strings -> {
    StringBuffer sb = new StringBuffer();
    for (Object string : strings) {
        sb.append((String) string);
    }
    return sb.toString();
}, m1, m2, m3);
System.out.println("Combined " + combined.block());

When m3 is added, the combinator is skipped in the response is null. When I remove m3, it all works as expected and "AB" is returned. Is there a way I could handle this by detecting the empty Mono? Also, is there a way to have the combinator method know the type of the object instead of having to cast?

like image 472
athom Avatar asked Aug 17 '17 05:08

athom


People also ask

What is mono empty?

Mono. empty() is a method invocation that returns a Mono that completes emitting no item. It represents an empty publisher that only calls onSubscribe and onComplete . This Publisher is effectively stateless, and only a single instance exists.

What is mono zip used for?

Purpose of Mono zip The zip method of Mono combines the result of multiple Mono streams collectively as a single result. It would create a new Mono only when all the provided Monos produces an element, aggregating their values into a Tuple . Note that the zip accepts a maximum of 8 Monos.

Does Mono zip run in parallel?

The zip method internally subscribes to the provided Monos, waits for all of them to complete and emit a value. Normally, the subscribe callback executes on a common thread. However, you can make the provided Monos execute in parallel by configuring schedulers subscribeOn(Scheduler) .


2 Answers

The zip operator doesn't behave like this. It would be in fact counter-intuitive: your code is expecting a Tuple of 3 elements and you're only getting two?!?

In this case, you're in control and only you can decide what's a good default value if none is provided (remember, null values are forbidden by the reactive streams specification).

Mono<String> m1 = Mono.just("A");
Mono<String> m2 = Mono.just("B");
Mono<String> m3 = Mono.empty().defaultIfEmpty("");

Mono<String> combined = Mono.when(m1, m2, m3).map(t -> {
    StringBuffer sb = new StringBuffer();
    sb.append(t.getT1());
    sb.append(t.getT2());
    sb.append(t.getT3());
    return sb.toString();
});

Edit

You seem to be confused by the nature of a Publisher type, see:

if one of the Monos is an empty Mono, zip fails without an error

and

So if I was to try and zip Mono's and for some reason one is empty, the zip would fail and I cannot seem to put in any code to safeguard against that

An empty Mono isn't a failure case: it's just that no value is emitted and it is completed successfully. You can verify that by changing the code sample:

    combined.subscribe(
            s -> System.out.println("element: " + s), // doesn't execute
            s -> System.out.println("error: " + s), // doesn't execute
            () -> { System.out.println("complete!"); // prints
    });

So depending on your requirements, you can:

  • apply a defaultIfEmpty operator on those 3 Mono instances, if there are convenient default values you can rely on
  • apply a defaultIfEmpty operator on the combined Mono, with a default value or even transform that into an error message with combined.switchIfEmpty(Mono.error(...))
like image 56
Brian Clozel Avatar answered Oct 28 '22 23:10

Brian Clozel


In case of String it is quite easy to define a default value for the empty case which solves the issue nicely as described in Brian's answer. However, for other custom types it might be difficult to create an empty object for one reason or another. An alternative for these cases is to use Optional. This solution has some heavy boilerplate, though.

Mono<Optional<String>> m1 = Mono.just("A").map(Optional::of).defaultIfEmpty(Optional.empty());
Mono<Optional<String>> m2 = Mono.just("B").map(Optional::of).defaultIfEmpty(Optional.empty());
Mono<Optional<String>> m3 = Mono.<String>empty().map(Optional::of).defaultIfEmpty(Optional.empty());

Mono<String> combined = Mono.zip(strings -> {
    StringBuffer sb = new StringBuffer();
    for (Object string : strings) {
        ((Optional<String>) string).ifPresent(sb::append);
    }
    return sb.toString();
}, m1, m2, m3);
System.out.println("Combined " + combined.block());
like image 38
Martin Tarjányi Avatar answered Oct 28 '22 23:10

Martin Tarjányi