Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

a variation of combineLatest that completes when the first of the observables passed as parameters completes

combineLatest operator returns an Observable that completes when all of the observables passed in as parameters to combineLatest complete.

Is there a way to create an Observable that behaves as the one returned by combineLatest with the only difference that it completes when the first of the Observables passed as parameters complete?

like image 771
Picci Avatar asked Apr 25 '18 06:04

Picci


People also ask

Does combineLatest complete?

combineLatest operator returns an Observable that completes when all of the observables passed in as parameters to combineLatest complete.

What is observable combineLatest?

When any of the source Observables emits an item, CombineLatest combines the most recently emitted items from each of the other source Observables, using a function you provide, and emits the return value from that function.

What is the difference between combineLatest and forkJoin?

combineLatest is similar to forkJoin, except that it combines the latest results of all the observables and emits the combined final value. So until each observable is completed, the subscription block emits the result.

What is the use of combineLatest in angular?

combineLatest allows to merge several streams by taking the most recent value from each input observable and emitting those values to the observer as a combined output (usually as an array).


2 Answers

Yep, you can; you can do something like this:

function combineLatestUntilFirstComplete(...args) {
  const shared = args.map(a => a.share());
  return Rx.Observable
    .combineLatest(...shared)
    .takeUntil(Rx.Observable.merge(...shared.map(s => s.last())));
}

const a = Rx.Observable.interval(100).map(index => `a${index}`).take(5);
const b = Rx.Observable.interval(200).map(index => `b${index}`).take(5);

combineLatestUntilFirstComplete(a, b).subscribe(
  value => console.log(JSON.stringify(value)),
  error => console.error(error),
  () => console.log("complete")
);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

The implementation takes values from the observable returned from the internal call to combineLatest until one of the source observables emits its last value.

Note that the source observables are shared, so that the subscriptions due to the takeUntil call don't effect secondary subscriptions to cold source observables.

like image 104
cartant Avatar answered Sep 29 '22 09:09

cartant


const a = Rx.Observable.interval(400).share();
const b = Rx.Observable.interval(600).share();
const c = Rx.Observable.interval(1000).take(3).share();

const combined = Rx.Observable.combineLatest(a, b, c)
  .takeUntil(
    Rx.Observable.merge(
      a.ignoreElements().concat(Rx.Observable.of('fin-a')).do(console.log),
      b.ignoreElements().concat(Rx.Observable.of('fin-b')).do(console.log),
      c.ignoreElements().concat(Rx.Observable.of('fin-c')).do(console.log)
    )
  );

combined
  .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.js"></script>

You need to .share() the input observables because you need them twice, once for the .combineLatest() and once for the .takeUntil to complete your observable stream.

I used the .ignoreElements() in the .takeUntil to ignore any values and only when the source stream (either a,b, or c) complete .concat a 'final' message to it to signal to the .takeUntil to complete our subscription to the .combineLatest. This also works if either a,b,c do not emit any values.

like image 39
Mark van Straten Avatar answered Sep 29 '22 07:09

Mark van Straten