Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava: How to group arbitrary large number of items using my own boundary function

Tags:

java

rx-java

I have an observable which emits strings, and I want to group them by first character. It's easy to do it with groupBy like that:

Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc");

Observable<List<String>> groupedRows = rows.groupBy(new Func1<String, Character>() {
  public Character call(String row) {
    return row.charAt(0);
  }
}).flatMap(new Func1<GroupedObservable<Character, String>, Observable<List<String>>>() {
  public Observable<List<String>> call(GroupedObservable<Character, String> group) {
    return group.toList();
  }
});

groupedRows.toBlocking().forEach(new Action1<List<String>>() {
  public void call(List<String> group) {
    System.out.println(group);
  }
});

// Output:
// [aa, ab, ac]
// [bb, bc]
// [cc]

But it's not good for my purposes, because groupBy only completes each group when the source observable emits onComplete. So if I have lots of rows, they will be completely gathered in memory and only at the very last row "flushed" and written to output.

I need something like buffer operator, but with my own function which denotes boundary of each group. I implemented it like that (knowing that rows are always ordered alphabeticaly):

Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc");

ConnectableObservable<String> connectableRows = rows.publish();

Observable<String> boundarySelector = connectableRows.filter(new Func1<String, Boolean>() {
  private char lastChar = 0;

  public Boolean call(String row) {
    char currentChar = row.charAt(0);
    boolean isNewGroup = lastChar != 0 && (currentChar != lastChar);
    lastChar = currentChar;
    return isNewGroup;
  }
});

Observable<List<String>> groupedRows = connectableRows.buffer(boundarySelector);

connectableRows.connect();

groupedRows.toBlocking().forEach(new Action1<List<String>>() {
  public void call(List<String> group) {
    System.out.println(group);
  }
});

// Output:
// []
// []
// []

It doesn't work, because boundarySelector is "eating" the rows, and I think it's weird because I specifically used ConnectableObservable to denote that I need two subscribers (boundarySelector and groupedRows) before the rows start emitting.

Curiosly if I delay rows by 1 seconds, then this code works.

So the question is: how do I group arbitrary number of rows using my own boundary function?

like image 823
Wojciech Gdela Avatar asked Nov 01 '22 01:11

Wojciech Gdela


1 Answers

Observable<Integer> source = Observable.range(0, 100);

source
.groupBy(k -> k / 10)
.publish(groups -> groups
        .map(g -> Pair.of(g.getKey(), g.takeUntil(groups)))
        .flatMap(kv -> 
            kv.second
            .doOnNext(v -> System.out.println(kv.first + " value " + v))
            .doOnCompleted(() -> System.out.println(kv.first + " done"))
        ))
.subscribe()
;
like image 175
akarnokd Avatar answered Nov 15 '22 06:11

akarnokd