Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJs split stream into multiple streams

How can I split a never ending stream into multiple ending streams based on a grouping method?

--a--a-a-a-a-b---b-b--b-c-c---c-c-d-d-d-e...>

into these observables

--a--a-a-a-a-|
             b---b-b--b-|
                        c-c---c-c-|
                                  d-d-d-|
                                        e...>

As you can see, the a is at the beginning, and after I receive b, i will no longer get a so it should be ended. That's why the normal groupBy is not good.

like image 260
Gergely Fehérvári Avatar asked Oct 19 '17 18:10

Gergely Fehérvári


2 Answers

You can use window and share the source Observable. There's also a little trick with bufferCount(2, 1):

const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();

source
    .bufferCount(2, 1) // delay emission by one item
    .map(arr => arr[0])
    .window(source
        .bufferCount(2, 1) // keep the previous and current item
        .filter(([oldValue, newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);

This prints (because of toArray()):

[ 'a', 'a', 'a', 'a', 'a' ]
[ 'b', 'b', 'b', 'b' ]
[ 'c', 'c', 'c', 'c' ]
[ 'd', 'd', 'd' ]
[ 'e' ]

The problem with this solution is the order of subscriptions to source. We need the window notifier to subscribe before the first bufferCount. Otherwise an item is first pushed further and then is checked whether it's different than the previous one with .filter(([oldValue, newValue]) ...).

This means that be need to delay emission by one before window (that's the first .bufferCount(2, 1).map(arr => arr[0]).

Or maybe it's easier to control the order of subscriptions myself with publish():

const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();

const connectable = source.publish();

connectable
    .window(source
        .bufferCount(2, 1) // keep the previous and current item
        .filter(([oldValue, newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);

connectable.connect();

The output is the same.

like image 139
martin Avatar answered Oct 15 '22 14:10

martin


Maybe someone can come up with something simpler but this works (fiddle: https://fiddle.jshell.net/uk01njgc/) ...

let counter = 0;

let items = Rx.Observable.interval(1000)
.map(value => Math.floor(value / 3))
.publish();

let distinct = items.distinctUntilChanged()
.publish();

distinct
.map(value => {
  return items
  .startWith(value)
  .takeUntil(distinct);
})
.subscribe(obs => {
  let obsIndex = counter++;
  console.log('New observable');
  obs.subscribe(
    value => {
      console.log(obsIndex.toString() + ': ' + value.toString());
    },
    err => console.log(err),
    () => console.log('Completed observable')
  );
});

distinct.connect();
items.connect();
like image 2
Pace Avatar answered Oct 15 '22 14:10

Pace