Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx equivalent of COUNT with GROUP BY?

I have a list of recurring elements, say:

Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A");

I would like to group them by their value along with how many times they appear, so the output would be pairs of:

{"A", 3}, {"B", 1}, {"C", 2}

Basically the equivalent of an SQL statement like SELECT x, COUNT(1) GROUP BY x;

I got only so far as to call groupBy on them:

source.groupBy(x -> x, x -> 1)

But this transforms the stream into GroupedObservables, and I couldn't find a good example how to move on with them. I tried reduce(), but it's not good here, since after groupBy() it wants to reduce the GroupedObservables, not the elements inside each group.

Is this possible with GroupedObservables? Is it possible any other way to achieve the desired result?

like image 388
zsolt.kocsi Avatar asked Dec 03 '22 23:12

zsolt.kocsi


2 Answers

The following code:

source.groupBy(val -> val)
    .flatMap(
        gr -> gr.count()
                .map(count -> new Pair<>(gr.getKey(), count)
    )
).subscribe(System.out::println);

Would print out:

A=3
B=1
C=2
like image 167
miensol Avatar answered Jan 06 '23 12:01

miensol


Another way is to use the collect or collectInto method,as shown below.

Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A");

source.collectInto(new HashMap<String, MutableInt>(), (map, elem) -> {
    if (map.containsKey(elem)) {
    map.get(elem).increment();
    } else {
    map.put(elem, new MutableInt(1));
    }

}).subscribe(System.out::println);

By the way,if we were using Reactor,collect is the suggestive way of doing this,because in case of large number of groups,flatMap after groupBy will hang.

From the javadoc of Reactor

Note that groupBy works best with a low cardinality of groups, so chose your keyMapper function accordingly.
...
Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

There is also an github issue related to this.

like image 30
chao_chang Avatar answered Jan 06 '23 12:01

chao_chang