Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Changing observable stream while keeping subscription

In RxJS, I want the subscription to persist on the stream even when the stream is changed. Below I used an interval stream to test the behaviour

//Works because foo$ is unchanged
let foo$ = Rx.Observable.interval(1000);
foo$.subscribe(x => console.log(`foo$: ${x}`));

//Doesn't work because bar$ is changed
let bar$ = Rx.Observable.never();
bar$.subscribe(x => console.log(`bar$: ${x}`))
bar$ = Rx.Observable.interval(1000);

jsbin Live Demo

How do I persist the subscription while changing bar$ stream? Do I have do dispose the subscription and set another subscription after I change bar$?

like image 249
ckwong Avatar asked Apr 25 '16 15:04

ckwong


1 Answers

The general pattern in Rx is to replace state mutation with a stream of values (i.e. an observable). Here, rather than reassigning bar$, bar$ should be modeled as an Observable<Observable<T>> (that is, a stream of streams of values of type T). It can then be "flattened out" into a stream of values (in this case, using switch).

For example:

const bar$ = new Rx.Subject();
bar$.switch().subscribe(x => console.log(`bar$: ${x}`));
bar$.onNext(Rx.Observable.fromArray([1,2,3]));
bar$.onNext(Rx.Observable.interval(1000).take(3));

https://jsbin.com/firoso/edit?js,console,output

like image 50
Matt Burnell Avatar answered Sep 17 '22 04:09

Matt Burnell