Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS combineLatest: how to get emit after just one value changes?

I'm trying to learn the RxJS library. One of the cases I don't quite understand is described in this jsfiddle (code also below).

var A= new Rx.Subject();
var B= new Rx.Subject();

A.onNext(0);    

// '.combineLatest' needs all the dependency Observables to get emitted, before its combined signal is emitted.
//
// How to have a combined signal emitted when any of the dependencies change (using earlier given values for the rest)?
//    
A.combineLatest( B, function (a,b) { return a+b; } )
 .subscribe( function (v) { console.log( "AB: "+ v ); } );

B.onNext("a");  
A.onNext(1);

I'd like to get two emits to the "AB" logging. One from changing B to "a" (A already has the value 0). Another from changing A to 1.

However, only changes that occur after a subscribe seem to matter (even though A has a value and thus the combined result could be computed).

Should I use "hot observables" for this, or some other method than .combineLatest?

My problem in the actual code (bigger than this sample) is that I need to make separate initialisations after the subscribes, which cuts stuff in two separate places instead of having the initial values clearly up front.

Thanks

like image 830
akauppi Avatar asked Sep 27 '15 20:09

akauppi


People also ask

What is the difference between combineLatest and forkJoin?

forkJoin - When all observables complete, emit the last emitted value from each. combineLatest - When any observable emits a value, emit the latest value from each. Usage is pretty similar, but you shouldn't forget to unsubscribe from combineLatest unlike forkJoin.

Does combineLatest complete?

combineLatest operator returns an Observable that completes when all of the observables passed in as parameters to combineLatest complete.

What is the difference between ZIP and combineLatest?

The CombineLatest operator behaves in a similar way to Zip, but while Zip emits items only when each of the zipped source Observables have emitted a previously unzipped item, CombineLatest emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least ...

How can you use combineLatest operator in rxjs6?

The operator works in the following way: Subscribe to all input observables. When a source observable emits a value, override any existing value in the cache for this observable. If here's a cached value for each obsevable in the cache, emit the combined values to the observer.


1 Answers

I think you have misunderstood how the Subjects work. Subjects are hot Observables. They do not hold on to values, so if they receive an onNext with no subscribers than that value will be lost to the world.

What you are looking for is a either the BehaviorSubject or the ReplaySubject both of which hold onto past values that re-emit them to new subscribers. In the former case you always construct it with an initial value

//All subscribers will receive 0
var subject = new Rx.BehaviorSubject(0);

//All subscribers will receive 1
//Including all future subscribers
subject.onNext(1);

in the latter you set the number of values to be replayed for each subscription

var subject = new Rx.ReplaySubject(1);
//All new subscribers will receive 0 until the subject receives its 
//next onNext call
subject.onNext(0);

Rewriting your example it could be:

var A= new Rx.BehaviorSubject(0);
var B= new Rx.Subject();    

// '.combineLatest' needs all the dependency Observables to get emitted, before its combined signal is emitted.
//
// How to have a combined signal emitted when any of the dependencies change (using earlier given values for the rest)?
//    
A.combineLatest( B, function (a,b) { return a+b; } )
 .subscribe( function (v) { console.log( "AB: "+ v ); } );

B.onNext("a");  
A.onNext(1);

//AB: 0a
//AB: 1a

On another note, realizing of course that this is all new to you, in most cases you should not need to use a Subject directly as it generally means that you are trying to wrangle Rx into the safety of your known paradigms. You should ask yourself, where is your data coming from? How is it being created? If you ask those questions enough, following your chain of events back up to the source, 9 out of 10 times you will find that there is probably an Observable wrapper for it.

like image 156
paulpdaniels Avatar answered Nov 15 '22 02:11

paulpdaniels