The following code:
Observable
.just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
.doOnNext(item -> System.out.println("source emitting " + item))
.groupBy(item -> {
System.out.println("groupBy called for " + item);
return item % 3;
})
.subscribe(observable -> {
System.out.println("got observable " + observable + " for key " + observable.getKey());
observable.subscribe(item -> {
System.out.println("key " + observable.getKey() + ", item " + item);
});
});
leaves me perplexed. The output I get is:
source emitting 0
groupBy called for 0
got observable rx.observables.GroupedObservable@42110406 for key 0
key 0, item 0
source emitting 1
groupBy called for 1
got observable rx.observables.GroupedObservable@1698c449 for key 1
key 1, item 1
source emitting 2
groupBy called for 2
got observable rx.observables.GroupedObservable@5ef04b5 for key 2
key 2, item 2
source emitting 3
groupBy called for 3
key 0, item 3
source emitting 4
groupBy called for 4
key 1, item 4
source emitting 5
groupBy called for 5
key 2, item 5
source emitting 6
groupBy called for 6
key 0, item 6
source emitting 7
groupBy called for 7
key 1, item 7
source emitting 8
groupBy called for 8
key 2, item 8
source emitting 9
groupBy called for 9
key 0, item 9
So, in the top level subscribe method, I get 3 observables from the GroupedObservable, as expected. Then, one by one, I subscribe to the grouped observables - and here the thing I don't understand:
Why are the original items still emitted in the original sequence (i.e. 0, 1, 2, 3, ...) and not 0, 3, 6, 9 ... for key 0, followed by 1, 4, 7 for key 1, followed by 2, 5, 8 for key 2?
I think I understand how the groups are created:
1. 0 is emitted, the key function is called and it gets 0
2. it is checked if an observable for 0 exists, it doesn't, so a new one is created and emitted, and then it emits 0
3. the same happens for source items 1 and 2 as they both create new groups, and observables with key 1 and 2 are emitted, and they emit 1 and 2 correspondingly
4. source item 3 is emitted, the key function is called and it gets 0
5. it is checked if an observable for 0 exists, it does -> no new grouped observable is created nor emitted, but 3 is emitted by the already existing observable
6. etc. until the source sequence is drained
It seems that although I get the grouped observables one by one, their emissions are somehow interleaved. How does this happen?
Why are the original items still emitted in the original sequence (i.e. 0, 1, 2, 3, ...) and not 0, 3, 6, 9 ... for key 0, followed by 1, 4, 7 for key 1, followed by 2, 5, 8 for key 2?
You've answered your own question. You're operating on a stream of items, in the order they're emitted. So as each one is emitted, it gets passed down the operator chain and you see the output you've shown here.
The alternative output you're expecting there requires the chain to wait until the source has stopped emitting items for all groups. Say you had Observable.just(0, 1, 2, 3, 4, 4, 4, 4, 4, 4, 0)
. Then you'd expect (0, 3, 0), (1, 4, 4, 4, 4, 4, 4), (2) as your output groups. What if you had an infinite stream of 4's? Your subscriber would never receive that 0, 3.. from the first group.
You can create the behaviour you're looking for. The toList
operator will cache output until the source completes, and then pass a List<R>
to the subscriber:
.subscribe(observable -> {
System.out.println("got observable " + observable + " for key " + observable.getKey());
observable.toList().subscribe(items -> {
// items is a List<Integer>
System.out.println("key " + observable.getKey() + ", items " + items);
});
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With