Given the following data structure Data
and Flux<Data>
what is idiomatic way to achieve grouping into series of lists based on some property:
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
class Scratch {
private static class Data {
private Integer key;
private String value;
public Data(Integer key, String value) {
this.key = key;
this.value = value;
}
public Integer getKey() {
return key;
}
public String getValue() {
return value;
}
public static Data of(Integer key, String value) {
return new Data(key, value);
}
@Override
public String toString() {
return value;
}
}
public static void main(String[] args) {
Flux<Data> test = Flux.just(
Data.of(1, "Hello"),
Data.of(1, "world"),
Data.of(2, "How"),
Data.of(2, "are"),
Data.of(2, "you"),
Data.of(3, "Bye"));
test.bufferUntil(new Predicate<Data>() {
Integer prev = null;
@Override
public boolean test(Data next) {
boolean collect = prev != null && !Objects.equals(prev, next.getKey());
prev = next.getKey();
return collect;
}
}, true).subscribe(e -> System.out.println(e.toString()));
}
}
Output:
[Hello, world]
[How, are, you]
[Bye]
I am aware of groupBy function on Flux, but this gives me again a Flux, not a list. Current solution I have described above works, but it does not feel 100% idiomatic because I had to use anonymous class instead of lambda. I could have use lambda and AtomicReference outside from lambda, but that too does not feel 100% right. Any suggestions?
A GroupedFlux , like a Flux , must be subscribed to to become active. So what you need to do is somehow consume the inner Flux that groupBy produces. One typical way of doing that is by using flatMap , which already takes a Function<T, Flux> transformation. The function can be as simple as Function.
A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements. This difference in the semantics of these two streams is very useful, as for example making a request to a http server expects to receive 0 or 1 response, it would be inappropriate to use a Flux in this case.
The groupingBy() method of Collectors class in Java are used for grouping objects by some property and storing results in a Map instance. In order to use it, we always need to specify a property by which the grouping would be performed.
zip() Flux has multiple options to combine publishers. One of them is the zip operator. Zip waits for each source to emit one element and combines these elements. The output emitted is a tuple with as many publishers wrapped inside the zip.
Here is a solution using groupBy operator. I have grouped the data by the common key. The groupBy operator gives me a Flux of GroupedFlux. GroupedFlux is a subclass of Flux, so I apply flatMap and convert an individual groupedFlux to a List<Data>
using the collectList operator. Like this, I get a Flux<List<Data>>
, which I then subscribe to and print, as asked by you.
test.groupBy(Data::getKey)
.flatMap(Flux::collectList)
.subscribe(listOfStringsHavingDataWithSameKey -> System.out.println(listOfStringsHavingDataWithSameKey.toString()));
Do checkout the documentations for Flux and GroupedFlux.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With