How can I split a never ending stream into multiple ending streams based on a grouping method?
--a--a-a-a-a-b---b-b--b-c-c---c-c-d-d-d-e...>
into these observables
--a--a-a-a-a-|
b---b-b--b-|
c-c---c-c-|
d-d-d-|
e...>
As you can see, the a
is at the beginning, and after I receive b
, i will no longer get a
so it should be ended. That's why the normal groupBy
is not good.
You can use window
and share
the source Observable. There's also a little trick with bufferCount(2, 1)
:
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();
source
.bufferCount(2, 1) // delay emission by one item
.map(arr => arr[0])
.window(source
.bufferCount(2, 1) // keep the previous and current item
.filter(([oldValue, newValue]) => oldValue !== newValue)
)
.concatMap(obs => obs.toArray())
.subscribe(console.log);
This prints (because of toArray()
):
[ 'a', 'a', 'a', 'a', 'a' ]
[ 'b', 'b', 'b', 'b' ]
[ 'c', 'c', 'c', 'c' ]
[ 'd', 'd', 'd' ]
[ 'e' ]
The problem with this solution is the order of subscriptions to source
. We need the window
notifier to subscribe before the first bufferCount
. Otherwise an item is first pushed further and then is checked whether it's different than the previous one with .filter(([oldValue, newValue]) ...)
.
This means that be need to delay emission by one before window
(that's the first .bufferCount(2, 1).map(arr => arr[0])
.
Or maybe it's easier to control the order of subscriptions myself with publish()
:
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();
const connectable = source.publish();
connectable
.window(source
.bufferCount(2, 1) // keep the previous and current item
.filter(([oldValue, newValue]) => oldValue !== newValue)
)
.concatMap(obs => obs.toArray())
.subscribe(console.log);
connectable.connect();
The output is the same.
Maybe someone can come up with something simpler but this works (fiddle: https://fiddle.jshell.net/uk01njgc/) ...
let counter = 0;
let items = Rx.Observable.interval(1000)
.map(value => Math.floor(value / 3))
.publish();
let distinct = items.distinctUntilChanged()
.publish();
distinct
.map(value => {
return items
.startWith(value)
.takeUntil(distinct);
})
.subscribe(obs => {
let obsIndex = counter++;
console.log('New observable');
obs.subscribe(
value => {
console.log(obsIndex.toString() + ': ' + value.toString());
},
err => console.log(err),
() => console.log('Completed observable')
);
});
distinct.connect();
items.connect();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With