I have two source observables from where I need to calc some data as soon as one source observable emits. I'm trying to use the combineAll()
operator but it only emits a value when each of the source observables emits for the first time.
Is there any operator similar to combineAll()
that emits as soon as any of the source observables emits for the first time? If not, what's the clearest way of doing it?
What I've tried:
const source1$ = service.getSomeData(); const source2$ = service.getOtherData(); combineLatest( source1$, source2$ ).pipe( map([source1Data, source2Data] => { // this code only gets executed when both observables emits for the first time return source1Data + source2Data; }) )
I call combineLatest operator the independent operator. They are independent and don't wait for each other.
combineLatest operator returns an Observable that completes when all of the observables passed in as parameters to combineLatest complete.
combineLatest is similar to forkJoin, except that it combines the latest results of all the observables and emits the combined final value. So until each observable is completed, the subscription block emits the result.
combineLatest combines the values from all the Observables passed as arguments. This happens by subscribing to each Observable in order and, whenever any Observable emits, collecting an array of the most recent values from each Observable.
If I understand correctly you want a pattern like the following diagram:
stream1$ => ------ 1 ------ 12 ----------------------- stream2$ => ------------------------- 30 ------------- result$ => ------ 1 ------ 12 ------ 42 --------------
If one value is available, emit that. If both are available, emit the combination of both, a simple sum in this case (12 + 30 = 42);
First the input streams, I've made them subjects for the sake of this example, so we can push data in manually:
const stream1$ = new Subject(); const stream2$ = new Subject();
Next we'll combine the inputs, first piped through the startWith operator. This makes sure that combineLatest produces an observable that emits immediately - [null, null]
to be precise.
const combined$ = combineLatest( stream1$.pipe(startWith(null)), stream2$.pipe(startWith(null)), );
Now you have an observable that always emits arrays of length 2, containing any combination of your data (numbers in this example) and null, like the following diagram:
stream1$ | startWith(NULL) => NULL ----------- 1 ----------- 12 ---------------------------- stream2$ | startWith(NULL) => NULL ---------------------------------------- 30 ------------- combined$ [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
Finally you can inspect and map
this output to your desired format: the sum of 2 numbers if both are available, or the first value to be available:
const processedCombinations$ = combined$.pipe( map(([data1, data2]) => { if (data1 === null) return data2; if (data2 === null) return data1; return data1 + data2; }), );
Result:
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] ------- processedCombinations$ => NULL ----------- 1 ----------- 12 ----------- 42 -------------
One problem remains: the first value emitted from combined$
is [null, null]
, causing processedCombinations$
to emit null
initially. One way to fix this is to chain another pipe using skipWhile
onto processedCombinations$
:
const final$ = processedCombinations$.pipe(skipWhile((input) => input === null));
Result:
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] ------- processedCombinations$ => NULL ----------- 1 ----------- 12 ----------- 42 ------------- final$ => ---------------- 1 ----------- 12 ----------- 42 -------------
Another - imo better - way is to filter the combined$
stream before processedCombinations$
(now actually final$
) is created from it:
const combinedFiltered$ = combined$.pipe( filter(([first, second])=> first !== null || second !== null), ); const final$ = combinedFiltered$.pipe( map(([data1, data2]) => { if (data1 === null) return data2; if (data2 === null) return data1; return data1 + data2; }), );
A corresponding diagram shows nicely how irrelevant values are eliminated as early in the stream hierarchy as possible:
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] ------- combinedFiltered$ => ---------------- [1, NULL] --- [12, NULL] --- [12, 30] ------- final$ => ---------------- 1 ----------- 12 ----------- 42 -------------
The above diagrams can be produced with this code:
final$.subscribe(console.log); stream1$.next(1); // logs: 1 stream1$.next(12); // logs: 12 stream2$.next(30); // logs: 42
Imports used:
import { combineLatest, Subject } from 'rxjs'; import { filter, map, skipWhile, startWith } from 'rxjs/operators';
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With