I want to use for join like this:
Observable.forkJoin(
this.service1.dataSourceIsAReplaySubject,
this.service2.dataSourceIsAnObservable)
.subscribe(values => {
console.log('inside subscribe values is', values);
});
but the console.log never fires.
I'm guessing the replay subject never completes which is why forkJoin never stops waiting.
I've tried this, but this doesn't work either. I've tried the ReplaySubject first and the Observable second and vice versa.
Observable.forkJoin(
Observable.from(this.service1.dataSourceIsAReplaySubject),
this.service2.dataSourceIsAnObservable)
.subscribe(values => {
console.log('inside subscribe values is', values);
});
I also tried calling the share method:
Observable.forkJoin(
this.service1.dataSourceIsAReplaySubject.share(),
this.service2.dataSourceIsAnObservable)
.subscribe(values => {
console.log('inside subscribe values is', values);
});
What can I do to wait on either two ReplaySubjects or a mix of 1 ReplaySubject and 1 Observable
As you yourself noted, the problem is that forkJoin
only emits once all streams complete. If you were going to use it you would need to make sure that all of your streams complete in some fashion (first()
, take(1)
, etc...).
A better option though might be to use either zip
or combineLatest
depending on your needs. The implementation would look similar:
Observable.zip(
this.service1.dataSourceIsAReplaySubject,
this.service2.dataSourceIsAnObservable
).subscribe(values => {
console.log('inside subscribe values is', values);
});
// OR
Observable.combineLatest(
this.service1.dataSourceIsAReplaySubject,
this.service2.dataSourceIsAnObservable
).subscribe(values => {
console.log('inside subscribe values is', values);
});
Breakdown
zip
will emit the two Observables in lock step with each other, that is each time one emits, the other must also emit before zip will emit a value, while this is useful it can also be dangerous if one emits much more rapidly than the other (since it will cause an unbounded buffer to grow behind the scenes).
combineLatest
will wait until both streams have emitted at least once and then any time one emits, combineLatest
will emit with the latest of the other stream.
There is also a third option, called withLatestFrom
. It functions similar to combineLatest
with the caveat that it only emits when the first stream emits and silently updates all the other latest values without emitting.
forkJoin
won't emit a value until the observables passed to it complete; it emits the last values of each of those observables.
You can ensure they complete by using the first
operator:
Observable.forkJoin(
this.service1.dataSourceIsAReplaySubject.first(),
this.service2.dataSourceIsAnObservable.first())
.subscribe(values => {
console.log('inside subscribe values is', values);
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With