I was under the impression combineLast
was a good fit at first, but as I read the docs it seems it isn’t: “Be aware that combineLatest
will not emit an initial value until each observable emits at least one value.”… And of course I hit exactly that exception. I tried forkJoin
and merge
in various combinations, but I can’t get it right.
The use-case is pretty straightforward, the method someObs
returns 0
or more observables, over which I loop. Based on a value on the SomeObs
object I push a newly constructed observable, OtherObs[]
, to an Array
of Observable<OtherObs[]>
. This array "needs" to be merged into a single observable, and before returning it I’d like to do some transformations.
Specifically I’m having difficulty replacing the combineLast
operator with something appropriate…
public obs(start: string, end: string): Observable<Array<OtherObs>> {
return this.someObs().pipe(
mergeMap((someObs: SomeObs[]) => {
let othObs: Array<Observable<OtherObs[]>> = [];
someObs.forEach((sobs: SomeObs) => {
othObs.push(this.yetAnotherObs(sobs));
});
return combineLatest<Event[]>(othObs).pipe(
map(arr => arr.reduce((acc, cur) => acc.concat(cur)))
);
})
);
}
private yetAnotherObs(): Observable<OtherObs[]> {
/// ...
}
private somObs(): Observable<SomeObs[]> {
/// ...
}
We can use the concat operator to take multiple Observables and return a new Observable that sequentially emits values from each Observable that were passed in. It works by subscribing to them one at a time and merging the results in the output Observable.
The combineLatest operator enforces a source Observable to start emitting only when all the inner Observables emit at least once. The forkJoin operator enforces a source Observable to start emitting only when all the inner Observables are complete.
The CombineLatest operator behaves in a similar way to Zip, but while Zip emits items only when each of the zipped source Observables have emitted a previously unzipped item, CombineLatest emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least ...
It will never complete if any of the inner streams doesn't complete. On the other hand, if any stream does not emit value but completes, resulting stream will complete at the same moment without emitting anything, since it will be now impossible to include value from completed input stream in resulting sequence.
The "problem" of combineLatest
is (like you said) that it "will not emit an initial value until each observable emits at least one value". But this is not a problem because you can use the RxJS operator startWith.
So your observable gets an initial value and combineLatest
works like a charme ;)
import { of, combineLatest } from 'rxjs';
import { delay, map, startWith } from 'rxjs/operators';
// Delay to show that this observable needs some time
const delayedObservable$ = of([10, 9, 8]).pipe(delay(5000));
// First Observable
const observable1$ = of([1, 2, 3]);
// Second observable that starts with empty array if no data
const observable2$ = delayedObservable$.pipe(startWith([]));
// Combine observable1$ and observable2$
combineLatest(observable1$, observable2$)
.pipe(
map(([one, two]) => {
// Because we start with an empty array, we already get data from the beginning
// After 5 seconds we also get data from observable2$
console.log('observable1$', one);
console.log('observable2$', two);
// Concat only to show that we map boths arrays to one array
return one.concat(two);
})
)
.subscribe(data => {
// After 0 seconds: [ 1, 2, 3 ]
// After 5 seconds: [ 1, 2, 3, 10, 9, 8 ]
console.log(data);
});
See example on Stackblitz
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With