I have a list of recurring elements, say:
Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A");
I would like to group them by their value along with how many times they appear, so the output would be pairs of:
{"A", 3}, {"B", 1}, {"C", 2}
Basically the equivalent of an SQL statement like SELECT x, COUNT(1) GROUP BY x;
I got only so far as to call groupBy on them:
source.groupBy(x -> x, x -> 1)
But this transforms the stream into GroupedObservables, and I couldn't find a good example how to move on with them. I tried reduce()
, but it's not good here, since after groupBy()
it wants to reduce the GroupedObservables, not the elements inside each group.
Is this possible with GroupedObservables? Is it possible any other way to achieve the desired result?
The following code:
source.groupBy(val -> val)
.flatMap(
gr -> gr.count()
.map(count -> new Pair<>(gr.getKey(), count)
)
).subscribe(System.out::println);
Would print out:
A=3
B=1
C=2
Another way is to use the collect
or collectInto
method,as shown below.
Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A");
source.collectInto(new HashMap<String, MutableInt>(), (map, elem) -> {
if (map.containsKey(elem)) {
map.get(elem).increment();
} else {
map.put(elem, new MutableInt(1));
}
}).subscribe(System.out::println);
By the way,if we were using Reactor,collect
is the suggestive way of doing this,because in case of large number of groups,flatMap after groupBy will hang.
From the javadoc of Reactor
Note that groupBy works best with a low cardinality of groups, so chose your keyMapper function accordingly.
...
Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).
There is also an github issue related to this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With