Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combining result from Flux to result from Mono

I started using Project reactor and one place where I'm struggling little is how do I combine things coming from Mono with Flux. Here's my use case:

public interface GroupRepository {
       Mono<GroupModel> getGroup(Long groupId);
}

public interface UserRepository {
       Flux<User> getUsers(Set<Long> userIds);   
}

Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.

Now what I want to achieve is:

How can I combine response from UserFlux and associate those users with that group, with something like group.addUsers(userfromFlux).

Can someone help with how to combine results coming from userFlux and groupMono. I think I use something like Zip but then it does one to one mapping from source. In my case, I need to do 1 to N mapping. Here I have one group but multiple user that I need to add to that group. Is it a good idea to return Mono<List<Users> and then use zip operator with both mono and provide a combinator as mentioned here
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, final BiFunction<? super T1, ? super T2, ? extends O> combinator)?

like image 717
User5817351 Avatar asked Apr 19 '17 21:04

User5817351


People also ask

How does mono and flux work?

Mono and Flux use the Decorator design pattern heavily to change the behaviour of original mono/flux publisher at runtime. This gives developers the flexibility to change the behaviour of the data stream at runtime.

What is mono flux?

Mono and Flux are both implementations of the Publisher interface. In simple terms, we can say that when we're doing something like a computation or making a request to a database or an external service, and expecting a maximum of one result, then we should use Mono.

What does flux concat do?

The concat method works as shown here. It connects to the data sources sequentially in the given order and collects all the data emitted from the sources. That is, it first collects all the data from source1, then connects to source2 and collects the data and so on. If you see, first we see only the source1 data.


2 Answers

This 1 to N mapping sounds similar to a question I gave an answer to here:

Can you Flux.zip a mono and a flux and and repeat the mono value for every flux value?

Incase that link goes down, here's the answer again. I don't think this method would have good performance as the mono would be recomputed every time. For better performance, if your Mono is wraps around a slow operation, it might be good to have some caching layer.

Suppose you have a flux and a mono like this:

 // a flux that contains 6 elements.
 final Flux<Integer> userIds = Flux.fromIterable(List.of(1,2,3,4,5,6));

 // a mono of 1 element.
 final Mono<String> groupLabel = Mono.just("someGroupLabel");

First, I'll show you the wrong way of trying to zip the 2 which I tried, and I think other people would try:

 // wrong way - this will only emit 1 event 
 final Flux<Tuple2<Integer, String>> wrongWayOfZippingFluxToMono = userIds
         .zipWith(groupLabel);

 // you'll see that onNext() is only called once, 
 //     emitting 1 item from the mono and first item from the flux.
 wrongWayOfZippingFluxToMono
         .log()
         .subscribe();

wrong way

 // this is how to zip up the flux and mono how you'd want, 
 //     such that every time the flux emits, the mono emits. 
 final Flux<Tuple2<Integer, String>> correctWayOfZippingFluxToMono = userIds
         .flatMap(userId -> Mono.just(userId)
                 .zipWith(groupLabel));

 // you'll see that onNext() is called 6 times here, as desired. 
 correctWayOfZippingFluxToMono
         .log()
         .subscribe();

enter image description here

like image 104
SomeGuy Avatar answered Dec 18 '22 00:12

SomeGuy


I think Flux.combineLatest static method could help you there: since your Mono only ever emits 1 element, that element will always be the one combined with each incoming value from the Flux.

Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
                   groupMono, userFlux);
like image 37
Simon Baslé Avatar answered Dec 17 '22 23:12

Simon Baslé