Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS How to Wait For Two ReplaySubjects To Produce Values Before Continuing

I want to use for join like this:

Observable.forkJoin(
    this.service1.dataSourceIsAReplaySubject,
    this.service2.dataSourceIsAnObservable)
.subscribe(values => {
    console.log('inside subscribe values is', values);
});

but the console.log never fires.

I'm guessing the replay subject never completes which is why forkJoin never stops waiting.

I've tried this, but this doesn't work either. I've tried the ReplaySubject first and the Observable second and vice versa.

Observable.forkJoin(
    Observable.from(this.service1.dataSourceIsAReplaySubject),
    this.service2.dataSourceIsAnObservable)
.subscribe(values => {
    console.log('inside subscribe values is', values);
});

I also tried calling the share method:

Observable.forkJoin(
    this.service1.dataSourceIsAReplaySubject.share(),
    this.service2.dataSourceIsAnObservable)
.subscribe(values => {
    console.log('inside subscribe values is', values);
});

What can I do to wait on either two ReplaySubjects or a mix of 1 ReplaySubject and 1 Observable

like image 597
IanT8 Avatar asked Dec 08 '22 20:12

IanT8


2 Answers

As you yourself noted, the problem is that forkJoin only emits once all streams complete. If you were going to use it you would need to make sure that all of your streams complete in some fashion (first(), take(1), etc...).

A better option though might be to use either zip or combineLatest depending on your needs. The implementation would look similar:

Observable.zip(
    this.service1.dataSourceIsAReplaySubject,
    this.service2.dataSourceIsAnObservable
).subscribe(values => {
    console.log('inside subscribe values is', values);
});

// OR

Observable.combineLatest(
    this.service1.dataSourceIsAReplaySubject,
    this.service2.dataSourceIsAnObservable
).subscribe(values => {
    console.log('inside subscribe values is', values);
});

Breakdown

zip will emit the two Observables in lock step with each other, that is each time one emits, the other must also emit before zip will emit a value, while this is useful it can also be dangerous if one emits much more rapidly than the other (since it will cause an unbounded buffer to grow behind the scenes).

combineLatest will wait until both streams have emitted at least once and then any time one emits, combineLatest will emit with the latest of the other stream.

There is also a third option, called withLatestFrom. It functions similar to combineLatest with the caveat that it only emits when the first stream emits and silently updates all the other latest values without emitting.

like image 182
paulpdaniels Avatar answered Jan 25 '23 22:01

paulpdaniels


forkJoin won't emit a value until the observables passed to it complete; it emits the last values of each of those observables.

You can ensure they complete by using the first operator:

Observable.forkJoin(
    this.service1.dataSourceIsAReplaySubject.first(),
    this.service2.dataSourceIsAnObservable.first())
.subscribe(values => {
    console.log('inside subscribe values is', values);
});
like image 26
cartant Avatar answered Jan 25 '23 22:01

cartant