Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

combineLatest alternative to combine or merge Observables

I was under the impression combineLast was a good fit at first, but as I read the docs it seems it isn’t: “Be aware that combineLatest will not emit an initial value until each observable emits at least one value.”… And of course I hit exactly that exception. I tried forkJoin and merge in various combinations, but I can’t get it right.

The use-case is pretty straightforward, the method someObs returns 0 or more observables, over which I loop. Based on a value on the SomeObs object I push a newly constructed observable, OtherObs[], to an Array of Observable<OtherObs[]>. This array "needs" to be merged into a single observable, and before returning it I’d like to do some transformations.

Specifically I’m having difficulty replacing the combineLast operator with something appropriate…

public obs(start: string, end: string): Observable<Array<OtherObs>> {
  return this.someObs().pipe(
    mergeMap((someObs: SomeObs[]) => {
      let othObs: Array<Observable<OtherObs[]>> = [];

      someObs.forEach((sobs: SomeObs) => {
        othObs.push(this.yetAnotherObs(sobs));
      });

      return combineLatest<Event[]>(othObs).pipe(
        map(arr => arr.reduce((acc, cur) => acc.concat(cur)))
      );
    })
  );
}

private yetAnotherObs(): Observable<OtherObs[]> {
  /// ...
}

private somObs(): Observable<SomeObs[]> {
  /// ...
}
like image 274
Dalie Avatar asked Jul 02 '18 15:07

Dalie


People also ask

How do I combine multiple Observables into one?

We can use the concat operator to take multiple Observables and return a new Observable that sequentially emits values from each Observable that were passed in. It works by subscribing to them one at a time and merging the results in the output Observable.

What is the difference between combineLatest and forkJoin?

The combineLatest operator enforces a source Observable to start emitting only when all the inner Observables emit at least once. The forkJoin operator enforces a source Observable to start emitting only when all the inner Observables are complete.

What is the difference between ZIP and combineLatest?

The CombineLatest operator behaves in a similar way to Zip, but while Zip emits items only when each of the zipped source Observables have emitted a previously unzipped item, CombineLatest emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least ...

Does combineLatest complete?

It will never complete if any of the inner streams doesn't complete. On the other hand, if any stream does not emit value but completes, resulting stream will complete at the same moment without emitting anything, since it will be now impossible to include value from completed input stream in resulting sequence.


1 Answers

The "problem" of combineLatest is (like you said) that it "will not emit an initial value until each observable emits at least one value". But this is not a problem because you can use the RxJS operator startWith.

So your observable gets an initial value and combineLatest works like a charme ;)

import { of, combineLatest } from 'rxjs';
import { delay, map, startWith } from 'rxjs/operators';

// Delay to show that this observable needs some time
const delayedObservable$ = of([10, 9, 8]).pipe(delay(5000));

// First Observable
const observable1$ = of([1, 2, 3]);

// Second observable that starts with empty array if no data
const observable2$ = delayedObservable$.pipe(startWith([]));

// Combine observable1$ and observable2$
combineLatest(observable1$, observable2$)
  .pipe(
    map(([one, two]) => {
      // Because we start with an empty array, we already get data from the beginning
      // After 5 seconds we also get data from observable2$
      console.log('observable1$', one);
      console.log('observable2$', two);

      // Concat only to show that we map boths arrays to one array
      return one.concat(two);
    })
  )
  .subscribe(data => {
    // After 0 seconds: [ 1, 2, 3 ]
    // After 5 seconds: [ 1, 2, 3, 10, 9, 8 ]
    console.log(data);
  });

See example on Stackblitz

like image 119
Paul Avatar answered Nov 17 '22 12:11

Paul