combineLatest
operator returns an Observable that completes when all of the observables passed in as parameters to combineLatest
complete.
Is there a way to create an Observable that behaves as the one returned by combineLatest
with the only difference that it completes when the first of the Observables passed as parameters complete?
combineLatest operator returns an Observable that completes when all of the observables passed in as parameters to combineLatest complete.
When any of the source Observables emits an item, CombineLatest combines the most recently emitted items from each of the other source Observables, using a function you provide, and emits the return value from that function.
combineLatest is similar to forkJoin, except that it combines the latest results of all the observables and emits the combined final value. So until each observable is completed, the subscription block emits the result.
combineLatest allows to merge several streams by taking the most recent value from each input observable and emitting those values to the observer as a combined output (usually as an array).
Yep, you can; you can do something like this:
function combineLatestUntilFirstComplete(...args) {
const shared = args.map(a => a.share());
return Rx.Observable
.combineLatest(...shared)
.takeUntil(Rx.Observable.merge(...shared.map(s => s.last())));
}
const a = Rx.Observable.interval(100).map(index => `a${index}`).take(5);
const b = Rx.Observable.interval(200).map(index => `b${index}`).take(5);
combineLatestUntilFirstComplete(a, b).subscribe(
value => console.log(JSON.stringify(value)),
error => console.error(error),
() => console.log("complete")
);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
The implementation takes values from the observable returned from the internal call to combineLatest
until one of the source observables emits its last value.
Note that the source observables are shared, so that the subscriptions due to the takeUntil
call don't effect secondary subscriptions to cold source observables.
const a = Rx.Observable.interval(400).share();
const b = Rx.Observable.interval(600).share();
const c = Rx.Observable.interval(1000).take(3).share();
const combined = Rx.Observable.combineLatest(a, b, c)
.takeUntil(
Rx.Observable.merge(
a.ignoreElements().concat(Rx.Observable.of('fin-a')).do(console.log),
b.ignoreElements().concat(Rx.Observable.of('fin-b')).do(console.log),
c.ignoreElements().concat(Rx.Observable.of('fin-c')).do(console.log)
)
);
combined
.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.js"></script>
You need to .share()
the input observables because you need them twice, once for the .combineLatest()
and once for the .takeUntil
to complete your observable stream.
I used the .ignoreElements()
in the .takeUntil
to ignore any values and only when the source stream (either a
,b
, or c
) complete .concat
a 'final' message to it to signal to the .takeUntil
to complete our subscription to the .combineLatest
. This also works if either a,b,c
do not emit any values.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With