Is it possible to do something like the code below? I have one service that makes an API call and another one which returns a stream of values. I need to modify every value by the value returned by the API call.
return Flux.zip(
someMono.get(),
someFlux.Get(),
(d, t) -> {
//HERE D IS ALWAYS THE SAME AND T IS EVERY NEW FLUX VALUE
});
I've tried with .repeat() for the Mono and it works, but it's calling the method every time there's a new Flux value and it's a API call, so it's not good.
Is it possible?
You can accomplish this using cache operator. Uncomment the Flux without cache and you will see the number of calls made to getNum == 100. With cache it will be 1.
public class RepeatableMono {
private static AtomicInteger numberOfCalls = new AtomicInteger(0);
static Integer getNum() {
System.out.println("GetNum Called: " + numberOfCalls.incrementAndGet());
return 0;
}
public static void main(String[] args) {
// This will print `GetNum Called: ` 100 times.
//Flux<Integer> neverEndingFlux = Mono.defer(() -> Mono.just(getNum()))
// .repeat();
// This will print `GetNum Called: ` 1 times.
Flux<Integer> neverEndingFlux = Mono.defer(() -> Mono.just(getNum()))
.cache()
.repeat();
Flux<Integer> stream = Flux.range(1, 100);
Flux.zip(neverEndingFlux, stream, (x, y) -> x + " " + y)
.subscribe(System.out::println);
}
}
This will illustrate how to combine a flux with a mono such that every time the flux emits, the mono is emitted as well.
Suppose you have a flux and a mono like this:
// a flux that contains 6 elements.
final Flux<Integer> userIds = Flux.fromIterable(List.of(1,2,3,4,5,6));
// a mono of 1 element.
final Mono<String> groupLabel = Mono.just("someGroupLabel");
First, I'll show you the wrong way of trying to zip the 2 which I tried, and I think other people would try:
// wrong way - this will only emit 1 event
final Flux<Tuple2<Integer, String>> wrongWayOfZippingFluxToMono = userIds
.zipWith(groupLabel);
// you'll see that onNext() is only called once,
// emitting 1 item from the mono and first item from the flux.
wrongWayOfZippingFluxToMono
.log()
.subscribe();

// this is how to zip up the flux and mono how you'd want,
// such that every time the flux emits, the mono emits.
final Flux<Tuple2<Integer, String>> correctWayOfZippingFluxToMono = userIds
.flatMap(userId -> Mono.just(userId)
.zipWith(groupLabel));
// you'll see that onNext() is called 6 times here, as desired.
correctWayOfZippingFluxToMono
.log()
.subscribe();

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With