In RxJS, I want the subscription to persist on the stream even when the stream is changed. Below I used an interval stream to test the behaviour
//Works because foo$ is unchanged
let foo$ = Rx.Observable.interval(1000);
foo$.subscribe(x => console.log(`foo$: ${x}`));
//Doesn't work because bar$ is changed
let bar$ = Rx.Observable.never();
bar$.subscribe(x => console.log(`bar$: ${x}`))
bar$ = Rx.Observable.interval(1000);
jsbin Live Demo
How do I persist the subscription while changing bar$
stream?
Do I have do dispose the subscription and set another subscription after I change bar$?
The general pattern in Rx is to replace state mutation with a stream of values (i.e. an observable). Here, rather than reassigning bar$, bar$ should be modeled as an Observable<Observable<T>>
(that is, a stream of streams of values of type T). It can then be "flattened out" into a stream of values (in this case, using switch
).
For example:
const bar$ = new Rx.Subject();
bar$.switch().subscribe(x => console.log(`bar$: ${x}`));
bar$.onNext(Rx.Observable.fromArray([1,2,3]));
bar$.onNext(Rx.Observable.interval(1000).take(3));
https://jsbin.com/firoso/edit?js,console,output
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With