I have a sequence of Mono
transformations using flatMap
. I managed to simplify my production code to this test case:
@Test
public void test() {
AtomicInteger iCounter = new AtomicInteger(1);
Mono<String> iValueMono = Mono.fromSupplier(() -> {
int iValue = iCounter.getAndIncrement();
System.out.println("iValueMono CALL: " + iValue);
return String.valueOf(iValue);
});
Mono<String> resultMono = Mono.just("X")
.flatMap(append(iValueMono))
.flatMap(append(iValueMono));
StepVerifier.create(resultMono)
.consumeNextWith(result -> assertThat(result).isEqualTo("X11"))
.expectComplete()
.verify();
}
private Function<String, Mono<String>> append(Mono<String> sMono) {
return s -> sMono.map(v -> s + v);
}
This prints:
iValueMono CALL: 1
iValueMono CALL: 2
org.junit.ComparisonFailure:
Expected :"X11"
Actual :"X12"
I thought - I see now that it was incorrect - that each time I map the iValueMono
in the append()
call, the supplier is re-executed to produces a new value. I cannot change in the production code how the iValueMono
is implemented (eg. to make it stateful to store the value). How can I implement this so that the value supplier is only called once and I get the final result "X11"?
Of course, I'm interested in a non-blocking, reactive way to do this.
Using Mono.cache()
is the answer:
Turn this Mono into a hot source and cache last emitted signals for further Subscriber.
Using it:
Mono<String> iValueMono = Mono.fromSupplier(() -> {
int iValue = iCounter.getAndIncrement();
System.out.println("iValueMono CALL: " + iValue);
return String.valueOf(iValue);
}).cache();
delivers the desired result of calling the supplier only once.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With