Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is a Grouped Flux and how exactly do we work with one?

I am working on a Flux of some object lets say Flux < MovieReservation > . This contains info like movie id,name, timing, title etc. So I wanted to extract the info that can help in creating a new
Flux < MovieShowDetail > . My intent was to group all reservations by the movie id and break the Flux down into a set of smaller and multiple Flux(fluxes if that's thing at all). Something like

Flux {
   movie1 -> Flux<MovieShowDetail>
   movie2 -> Flux<MovieShowDetail>
   ... and so on
}

So I came across this groupBy method which is supposed to do something like this only. However the documentation is really out of content on this, especially on how to iterate over each movie and its respective Flux.

Moreover when I try to learn by try and error, the processing stops after working on the operation before the groupBy method.

I have tried to do

fluxOfSomething
.groupBy( movieReservation -> movieReservation.getMovieId ,
movieReservation -> movieReservation) 

so that i can iterate over each of the flux and create the new flux of MovieShowDetail. However, processing never gets in this block. I tried logging stuff but the flow never entered it.

flux
    .map( movieSomething -> do something)
    .groupBy( movieReservation -> 
        movieReservation.getMovieId , movieReservation ->
    movieReservation)
    .subscribe("This text doesn't get printed");

I really need as much info as you can give about this.

like image 415
ayush prashar Avatar asked Mar 03 '23 21:03

ayush prashar


1 Answers

groupBy produces a Flux<Flux<T>> (or more precisely a Flux<GroupedFlux<T>>, which exposes the key of each group).

A GroupedFlux, like a Flux, must be subscribed to to become active. So what you need to do is somehow consume the inner Flux that groupBy produces.

One typical way of doing that is by using flatMap, which already takes a Function<T, Flux> transformation. The function can be as simple as Function.identity() (but if you want to further process each element in the inner Flux you should probably do so from within the flatMap Function (because the group key is in scope of that lambda).

movieReservations
    .groupBy(MovieReservation::movieId)
    .flatMap(idFlux -> idFlux
        .collectList()
        .map(listOfReservations ->
            new MovieInformation(idFlux.key(), listOfReservations)
        )
    );
like image 61
Simon Baslé Avatar answered Mar 23 '23 02:03

Simon Baslé