Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merge results of array of Observables

I need to fetch IDs from my state as separate Observables, combine all of them into one Observable and subscribe() to it. I need that my subscriber will eventually get an array of resolved Observable results.

Also I need to keep this subscription open, so if one of my inner Observables changes my subscriber will be notified.

This is my code:

getTransactionsByIDs(transactionsIDs){
return Observable.of(transactionIDs
  .map(transactionID => this.getTransactionByID(transactionID)));
}

this.transactionsService.getTransactionsByIDs(transactionsIDs)
.subscribe(transactions=>{
 ....
})

The result of the above code in the subscriber function is an array of unresolved stores.

How can I resolve each store and combine all of them together?

I've also tried to use Observable.from() on the transactionsIDs to transform each ID into an Observable and then resolve it. It works fine, but then my subscriber getting notified on every ID separately. If there's a way to batch all of the Observable.from() results (while keeping subscription open), please let me know.

This is how my Observable.from() looks like:

getTransactionsByIDs(transactionsIDs){
return transactionIDs
  .mergeMap(transactionID => this.getTransactionByID(transactionID));
}

this.transactionsService.getTransactionsByIDs(Observable.from(transactionsIDs))
.subscribe(transactions=>{
 ....
})
like image 662
Gili Yaniv Avatar asked Jan 16 '18 19:01

Gili Yaniv


People also ask

How do I combine multiple Observables into one?

concat. We can use the concat operator to take multiple Observables and return a new Observable that sequentially emits values from each Observable that were passed in. It works by subscribing to them one at a time and merging the results in the output Observable.

Is observable an array?

Behind the scenes, an observableArray is actually an observable whose value is an array (plus, observableArray adds some additional features described below). So, you can get the underlying JavaScript array by invoking the observableArray as a function with no parameters, just like any other observable.


1 Answers

I think that what you want is combineLatest. It wont emit any value till all inner observables emit at least one value. After that it will emit the latest from all each time there is a new emit from one of the inner observables.

Here is some reading material: https://www.learnrxjs.io/operators/combination/combinelatest.html

Here is an example:

function getTransactionByID(transactionId) {
  let count = 0;
  return Rx.Observable.of(transactionId)
    .delay(Math.random() * 4000)
    .map(x => `${x}: ${count++} `)
    .repeat();
}

function getTransactionsByIDs(transactionsIDs){
  return Rx.Observable.combineLatest(transactionsIDs.map(transactionID => getTransactionByID(transactionID)));
}

const transactionsIDs = [1,2,3];
getTransactionsByIDs(transactionsIDs)
  .take(10)
  .subscribe(x => { console.log(x); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.min.js"></script>

The take(10) is just the keep the example from going on forever.

like image 174
bygrace Avatar answered Sep 20 '22 04:09

bygrace