I started using Project reactor and one place where I'm struggling little is how do I combine things coming from Mono with Flux. Here's my use case:
public interface GroupRepository {
Mono<GroupModel> getGroup(Long groupId);
}
public interface UserRepository {
Flux<User> getUsers(Set<Long> userIds);
}
Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.
Now what I want to achieve is:
How can I combine response from UserFlux and associate those users with that group, with something like group.addUsers(userfromFlux).
Can someone help with how to combine results coming from userFlux and groupMono. I think I use something like Zip but then it does one to one mapping from source. In my case, I need to do 1 to N mapping. Here I have one group but multiple user that I need to add to that group. Is it a good idea to return Mono<List<Users>
and then use zip operator with both mono and provide a combinator as mentioned herepublic static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
final BiFunction<? super T1, ? super T2, ? extends O> combinator)
?
Mono and Flux use the Decorator design pattern heavily to change the behaviour of original mono/flux publisher at runtime. This gives developers the flexibility to change the behaviour of the data stream at runtime.
Mono and Flux are both implementations of the Publisher interface. In simple terms, we can say that when we're doing something like a computation or making a request to a database or an external service, and expecting a maximum of one result, then we should use Mono.
The concat method works as shown here. It connects to the data sources sequentially in the given order and collects all the data emitted from the sources. That is, it first collects all the data from source1, then connects to source2 and collects the data and so on. If you see, first we see only the source1 data.
This 1 to N mapping sounds similar to a question I gave an answer to here:
Can you Flux.zip a mono and a flux and and repeat the mono value for every flux value?
Incase that link goes down, here's the answer again. I don't think this method would have good performance as the mono would be recomputed every time. For better performance, if your Mono is wraps around a slow operation, it might be good to have some caching layer.
Suppose you have a flux and a mono like this:
// a flux that contains 6 elements.
final Flux<Integer> userIds = Flux.fromIterable(List.of(1,2,3,4,5,6));
// a mono of 1 element.
final Mono<String> groupLabel = Mono.just("someGroupLabel");
First, I'll show you the wrong way of trying to zip the 2 which I tried, and I think other people would try:
// wrong way - this will only emit 1 event
final Flux<Tuple2<Integer, String>> wrongWayOfZippingFluxToMono = userIds
.zipWith(groupLabel);
// you'll see that onNext() is only called once,
// emitting 1 item from the mono and first item from the flux.
wrongWayOfZippingFluxToMono
.log()
.subscribe();
// this is how to zip up the flux and mono how you'd want,
// such that every time the flux emits, the mono emits.
final Flux<Tuple2<Integer, String>> correctWayOfZippingFluxToMono = userIds
.flatMap(userId -> Mono.just(userId)
.zipWith(groupLabel));
// you'll see that onNext() is called 6 times here, as desired.
correctWayOfZippingFluxToMono
.log()
.subscribe();
I think Flux.combineLatest
static method could help you there: since your Mono
only ever emits 1 element, that element will always be the one combined with each incoming value from the Flux
.
Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
groupMono, userFlux);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With