Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to combine the results of multiple observable, RxJS?

I'm learning RxJS and trying to replace some code I did with promises before, but I'm struggling to find the right way to combine observables.

I have a code that calls multiple APIs and for each API call executes the same code. Also, after API calls are done I have some other code that is supposed to be executed only then and only once. I did this with Promises, I made multiple Promises and for each Promise I execute the same code, and on Promise.all() I just do the other code.

So I tried to do this with RxJS:

var apiCallIterator = ["id1", "id2", id3];
var apiCallObservable = from(apiCallIterator)
  .pipe(
    mergeMap(apiCallIterator => {
      return makeAnApiCall();
    })
   );

apiCallObservable.subscribe({
next: value => {
 // do some code for each API call
},
error: () => {
 // api call failed
}
})

This works fine for the first part of my task, it will execute the same code after each API call is done. But I can't find a way to get all results in one spot after all API calls are done.

Maybe I'm doing this wrong from the beginning, but this part seems to work fine for me.

like image 971
snoopy25 Avatar asked Jul 11 '18 16:07

snoopy25


People also ask

How do I combine multiple Observables?

combine multiple Observables into one by merging their emissions. You can combine the output of multiple Observables so that they act like a single Observable, by using the Merge operator.

Does combineLatest complete?

combineLatest operator returns an Observable that completes when all of the observables passed in as parameters to combineLatest complete.

What is the difference between combineLatest and forkJoin?

forkJoin - When all observables complete, emit the last emitted value from each. combineLatest - When any observable emits a value, emit the latest value from each.

What is FlatMap in RxJS?

The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.

How to get the latest value from multiple observables in RxJS?

Combinelatest operator joins multiple observable to create an observable. This newly formed observable takes the latest value of every connected observable and emits the latest result. Combining 3 observables and getting the latest value using this operator // RxJS import { of, combineLatest } from 'rxjs'; // 1.

How to merge multiple data streams in RxJS?

Let’s understand first what this method is… In RxJS combineLatest operator is used to merge multiple data streams (Observables). Combinelatest operator joins multiple observable to create an observable. This newly formed observable takes the latest value of every connected observable and emits the latest result.

What is the use of combinelatest in RxJS?

In RxJS combineLatest operator is used to merge multiple data streams (Observables). Combinelatest operator joins multiple observable to create an observable. This newly formed observable takes the latest value of every connected observable and emits the latest result. Combining 3 observables and getting the latest value using this operator

How to combine the output of multiple observables?

We can use the merge operator to combine the output of multiple Observables so that they act like one: 3.2. MergeDelayError The mergeDelayError method is the same as merge in that it combines multiple Observables into one, but if errors occur during the merge, it allows error-free items to continue before propagating the errors:


2 Answers

Most easily just add toArray():

from(apiCallIterator)
  .pipe(
    mergeMap(val => makeAnApiCall()),
    toArray(), // collect all results
  )
  .subscribe(allResults => ...); // allResults.forEach(...) to iterate all results

You could also use forkJoin but then you'd have to prepare an array of all Observables source beforehand. However, this is only a matter of your preference.

https://stackblitz.com/edit/rxjs6-demo-qbvhjh?file=index.ts

like image 165
martin Avatar answered Nov 14 '22 02:11

martin


Using forkJoin you can achieve this

forkJoin is an operator that takes any number of Observables which can be passed either as an array or directly as arguments. If no input Observables are provided, resulting stream will complete immediately.

forkJoin will wait for all passed Observables to complete and then it will emit an array with last values from corresponding Observables. So if you pass n Observables to the operator, resulting array will have n values, where first value is the last thing emitted by the first Observable, second value is the last thing emitted by the second Observable and so on.

like image 23
NullPointer Avatar answered Nov 14 '22 02:11

NullPointer