Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

groupBy operator, items from different groups interleaved

Tags:

rx-java

The following code:

    Observable
            .just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
            .doOnNext(item -> System.out.println("source emitting " + item))
            .groupBy(item -> {
                System.out.println("groupBy called for " + item);
                return item % 3;
            })
            .subscribe(observable -> {
                System.out.println("got observable " + observable + " for key " + observable.getKey());
                observable.subscribe(item -> {
                    System.out.println("key " + observable.getKey() + ", item " + item);
                });
            });

leaves me perplexed. The output I get is:

    source emitting 0
    groupBy called for 0
    got observable rx.observables.GroupedObservable@42110406 for key 0
    key 0, item 0
    source emitting 1
    groupBy called for 1
    got observable rx.observables.GroupedObservable@1698c449 for key 1
    key 1, item 1
    source emitting 2
    groupBy called for 2
    got observable rx.observables.GroupedObservable@5ef04b5 for key 2
    key 2, item 2
    source emitting 3
    groupBy called for 3
    key 0, item 3
    source emitting 4
    groupBy called for 4
    key 1, item 4
    source emitting 5
    groupBy called for 5
    key 2, item 5
    source emitting 6
    groupBy called for 6
    key 0, item 6
    source emitting 7
    groupBy called for 7
    key 1, item 7
    source emitting 8
    groupBy called for 8
    key 2, item 8
    source emitting 9
    groupBy called for 9
    key 0, item 9

So, in the top level subscribe method, I get 3 observables from the GroupedObservable, as expected. Then, one by one, I subscribe to the grouped observables - and here the thing I don't understand:

Why are the original items still emitted in the original sequence (i.e. 0, 1, 2, 3, ...) and not 0, 3, 6, 9 ... for key 0, followed by 1, 4, 7 for key 1, followed by 2, 5, 8 for key 2?

I think I understand how the groups are created:

1. 0 is emitted, the key function is called and it gets 0
2. it is checked if an observable for 0 exists, it doesn't, so a new one is created and emitted, and then it emits 0
3. the same happens for source items 1 and 2 as they both create new groups, and observables with key 1 and 2 are emitted, and they emit 1 and 2 correspondingly
4. source item 3 is emitted, the key function is called and it gets 0
5. it is checked if an observable for 0 exists, it does -> no new grouped observable is created nor emitted, but 3 is emitted by the already existing observable
6. etc. until the source sequence is drained

It seems that although I get the grouped observables one by one, their emissions are somehow interleaved. How does this happen?

like image 235
wujek Avatar asked Jun 13 '15 09:06

wujek


1 Answers

Why are the original items still emitted in the original sequence (i.e. 0, 1, 2, 3, ...) and not 0, 3, 6, 9 ... for key 0, followed by 1, 4, 7 for key 1, followed by 2, 5, 8 for key 2?

You've answered your own question. You're operating on a stream of items, in the order they're emitted. So as each one is emitted, it gets passed down the operator chain and you see the output you've shown here.

The alternative output you're expecting there requires the chain to wait until the source has stopped emitting items for all groups. Say you had Observable.just(0, 1, 2, 3, 4, 4, 4, 4, 4, 4, 0). Then you'd expect (0, 3, 0), (1, 4, 4, 4, 4, 4, 4), (2) as your output groups. What if you had an infinite stream of 4's? Your subscriber would never receive that 0, 3.. from the first group.

You can create the behaviour you're looking for. The toList operator will cache output until the source completes, and then pass a List<R> to the subscriber:

.subscribe(observable -> {
    System.out.println("got observable " + observable + " for key " + observable.getKey());
    observable.toList().subscribe(items -> {
        // items is a List<Integer>
        System.out.println("key " + observable.getKey() + ", items " + items);
    });
});
like image 76
Adam S Avatar answered Oct 04 '22 19:10

Adam S