I have an observable which emits strings, and I want to group them by first character. It's easy to do it with groupBy
like that:
Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc");
Observable<List<String>> groupedRows = rows.groupBy(new Func1<String, Character>() {
public Character call(String row) {
return row.charAt(0);
}
}).flatMap(new Func1<GroupedObservable<Character, String>, Observable<List<String>>>() {
public Observable<List<String>> call(GroupedObservable<Character, String> group) {
return group.toList();
}
});
groupedRows.toBlocking().forEach(new Action1<List<String>>() {
public void call(List<String> group) {
System.out.println(group);
}
});
// Output:
// [aa, ab, ac]
// [bb, bc]
// [cc]
But it's not good for my purposes, because groupBy
only completes each group when the source observable emits onComplete
. So if I have lots of rows, they will be completely gathered in memory and only at the very last row "flushed" and written to output.
I need something like buffer
operator, but with my own function which denotes boundary of each group. I implemented it like that (knowing that rows are always ordered alphabeticaly):
Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc");
ConnectableObservable<String> connectableRows = rows.publish();
Observable<String> boundarySelector = connectableRows.filter(new Func1<String, Boolean>() {
private char lastChar = 0;
public Boolean call(String row) {
char currentChar = row.charAt(0);
boolean isNewGroup = lastChar != 0 && (currentChar != lastChar);
lastChar = currentChar;
return isNewGroup;
}
});
Observable<List<String>> groupedRows = connectableRows.buffer(boundarySelector);
connectableRows.connect();
groupedRows.toBlocking().forEach(new Action1<List<String>>() {
public void call(List<String> group) {
System.out.println(group);
}
});
// Output:
// []
// []
// []
It doesn't work, because boundarySelector
is "eating" the rows, and I think it's weird because I specifically used ConnectableObservable
to denote that I need two subscribers (boundarySelector
and groupedRows
) before the rows
start emitting.
Curiosly if I delay rows
by 1 seconds, then this code works.
So the question is: how do I group arbitrary number of rows using my own boundary function?
Observable<Integer> source = Observable.range(0, 100);
source
.groupBy(k -> k / 10)
.publish(groups -> groups
.map(g -> Pair.of(g.getKey(), g.takeUntil(groups)))
.flatMap(kv ->
kv.second
.doOnNext(v -> System.out.println(kv.first + " value " + v))
.doOnCompleted(() -> System.out.println(kv.first + " done"))
))
.subscribe()
;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With