Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS combineLatest without waiting for source observables to emit?

I have two source observables from where I need to calc some data as soon as one source observable emits. I'm trying to use the combineAll() operator but it only emits a value when each of the source observables emits for the first time.

Is there any operator similar to combineAll() that emits as soon as any of the source observables emits for the first time? If not, what's the clearest way of doing it?

What I've tried:

const source1$ = service.getSomeData(); const source2$ = service.getOtherData();  combineLatest(   source1$,   source2$ ).pipe(   map([source1Data, source2Data] => {     // this code only gets executed when both observables emits for the first time     return source1Data + source2Data;   }) ) 
like image 423
Elias Garcia Avatar asked Jan 04 '19 12:01

Elias Garcia


People also ask

Does combineLatest wait?

I call combineLatest operator the independent operator. They are independent and don't wait for each other.

Does combineLatest complete?

combineLatest operator returns an Observable that completes when all of the observables passed in as parameters to combineLatest complete.

What is the difference between combineLatest and forkJoin?

combineLatest is similar to forkJoin, except that it combines the latest results of all the observables and emits the combined final value. So until each observable is completed, the subscription block emits the result.

Does combineLatest subscribe?

combineLatest combines the values from all the Observables passed as arguments. This happens by subscribing to each Observable in order and, whenever any Observable emits, collecting an array of the most recent values from each Observable.


1 Answers

If I understand correctly you want a pattern like the following diagram:

stream1$ => ------ 1 ------ 12 ----------------------- stream2$ => ------------------------- 30 -------------  result$  => ------ 1 ------ 12 ------ 42 -------------- 

If one value is available, emit that. If both are available, emit the combination of both, a simple sum in this case (12 + 30 = 42);

First the input streams, I've made them subjects for the sake of this example, so we can push data in manually:

const stream1$ = new Subject(); const stream2$ = new Subject(); 

Next we'll combine the inputs, first piped through the startWith operator. This makes sure that combineLatest produces an observable that emits immediately - [null, null] to be precise.

const combined$ = combineLatest(   stream1$.pipe(startWith(null)),   stream2$.pipe(startWith(null)), ); 

Now you have an observable that always emits arrays of length 2, containing any combination of your data (numbers in this example) and null, like the following diagram:

stream1$ | startWith(NULL) => NULL ----------- 1 ----------- 12 ---------------------------- stream2$ | startWith(NULL) => NULL ---------------------------------------- 30 -------------  combined$                     [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] ------- 

Finally you can inspect and map this output to your desired format: the sum of 2 numbers if both are available, or the first value to be available:

const processedCombinations$ = combined$.pipe(   map(([data1, data2]) => {     if (data1 === null) return data2;     if (data2 === null) return data1;      return data1 + data2;   }), ); 

Result:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] ------- processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 ------------- 

One problem remains: the first value emitted from combined$ is [null, null], causing processedCombinations$ to emit null initially. One way to fix this is to chain another pipe using skipWhile onto processedCombinations$:

const final$ = processedCombinations$.pipe(skipWhile((input) => input === null)); 

Result:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] ------- processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 ------------- final$                     => ---------------- 1 ----------- 12 ----------- 42 ------------- 

Another - imo better - way is to filter the combined$ stream before processedCombinations$ (now actually final$) is created from it:

const combinedFiltered$ = combined$.pipe(     filter(([first, second])=> first !== null || second !== null), );  const final$ = combinedFiltered$.pipe(     map(([data1, data2]) => {         if (data1 === null) return data2;         if (data2 === null) return data1;          return data1 + data2;     }), ); 

A corresponding diagram shows nicely how irrelevant values are eliminated as early in the stream hierarchy as possible:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] ------- combinedFiltered$          => ---------------- [1, NULL] --- [12, NULL] --- [12, 30] ------- final$                     => ---------------- 1 ----------- 12 ----------- 42 ------------- 

The above diagrams can be produced with this code:

final$.subscribe(console.log);  stream1$.next(1); // logs: 1  stream1$.next(12); // logs: 12  stream2$.next(30); // logs: 42 

Imports used:

import { combineLatest, Subject } from 'rxjs'; import { filter, map, skipWhile, startWith } from 'rxjs/operators'; 
like image 185
JJWesterkamp Avatar answered Oct 19 '22 21:10

JJWesterkamp