I am using Spring Reactor 3.1.0.M3 and have a use case where I need to merge Mono's from multiple sources. I found that if one of the Monos is an empty Mono, zip fails without an error.
Example:
Mono<String> m1 = Mono.just("A");
Mono<String> m2 = Mono.just("B");
Mono<String> m3 = Mono.empty();
Mono<String> combined = Mono.zip(strings -> {
StringBuffer sb = new StringBuffer();
for (Object string : strings) {
sb.append((String) string);
}
return sb.toString();
}, m1, m2, m3);
System.out.println("Combined " + combined.block());
When m3 is added, the combinator is skipped in the response is null. When I remove m3, it all works as expected and "AB" is returned. Is there a way I could handle this by detecting the empty Mono? Also, is there a way to have the combinator method know the type of the object instead of having to cast?
Mono. empty() is a method invocation that returns a Mono that completes emitting no item. It represents an empty publisher that only calls onSubscribe and onComplete . This Publisher is effectively stateless, and only a single instance exists.
Purpose of Mono zip The zip method of Mono combines the result of multiple Mono streams collectively as a single result. It would create a new Mono only when all the provided Monos produces an element, aggregating their values into a Tuple . Note that the zip accepts a maximum of 8 Monos.
The zip method internally subscribes to the provided Monos, waits for all of them to complete and emit a value. Normally, the subscribe callback executes on a common thread. However, you can make the provided Monos execute in parallel by configuring schedulers subscribeOn(Scheduler) .
The zip operator doesn't behave like this. It would be in fact counter-intuitive: your code is expecting a Tuple of 3 elements and you're only getting two?!?
In this case, you're in control and only you can decide what's a good default value if none is provided (remember, null
values are forbidden by the reactive streams specification).
Mono<String> m1 = Mono.just("A");
Mono<String> m2 = Mono.just("B");
Mono<String> m3 = Mono.empty().defaultIfEmpty("");
Mono<String> combined = Mono.when(m1, m2, m3).map(t -> {
StringBuffer sb = new StringBuffer();
sb.append(t.getT1());
sb.append(t.getT2());
sb.append(t.getT3());
return sb.toString();
});
You seem to be confused by the nature of a Publisher
type, see:
if one of the Monos is an empty Mono, zip fails without an error
and
So if I was to try and zip Mono's and for some reason one is empty, the zip would fail and I cannot seem to put in any code to safeguard against that
An empty Mono
isn't a failure case: it's just that no value is emitted and it is completed successfully. You can verify that by changing the code sample:
combined.subscribe(
s -> System.out.println("element: " + s), // doesn't execute
s -> System.out.println("error: " + s), // doesn't execute
() -> { System.out.println("complete!"); // prints
});
So depending on your requirements, you can:
defaultIfEmpty
operator on those 3 Mono
instances, if there are convenient default values you can rely ondefaultIfEmpty
operator on the combined Mono
, with a default value or even transform that into an error message with combined.switchIfEmpty(Mono.error(...))
In case of String
it is quite easy to define a default value for the empty case which solves the issue nicely as described in Brian's answer. However, for other custom types it might be difficult to create an empty object for one reason or another. An alternative for these cases is to use Optional
. This solution has some heavy boilerplate, though.
Mono<Optional<String>> m1 = Mono.just("A").map(Optional::of).defaultIfEmpty(Optional.empty());
Mono<Optional<String>> m2 = Mono.just("B").map(Optional::of).defaultIfEmpty(Optional.empty());
Mono<Optional<String>> m3 = Mono.<String>empty().map(Optional::of).defaultIfEmpty(Optional.empty());
Mono<String> combined = Mono.zip(strings -> {
StringBuffer sb = new StringBuffer();
for (Object string : strings) {
((Optional<String>) string).ifPresent(sb::append);
}
return sb.toString();
}, m1, m2, m3);
System.out.println("Combined " + combined.block());
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With