I need to fetch IDs from my state as separate Observable
s, combine all of them into one Observable
and subscribe()
to it.
I need that my subscriber will eventually get an array of resolved Observable
results.
Also I need to keep this subscription open, so if one of my inner Observable
s changes my subscriber will be notified.
This is my code:
getTransactionsByIDs(transactionsIDs){
return Observable.of(transactionIDs
.map(transactionID => this.getTransactionByID(transactionID)));
}
this.transactionsService.getTransactionsByIDs(transactionsIDs)
.subscribe(transactions=>{
....
})
The result of the above code in the subscriber function is an array of unresolved stores.
How can I resolve each store and combine all of them together?
I've also tried to use Observable.from()
on the transactionsID
s to transform each ID into an Observable
and then resolve it. It works fine, but then my subscriber getting notified on every ID separately. If there's a way to batch all of the Observable.from()
results (while keeping subscription open), please let me know.
This is how my Observable.from()
looks like:
getTransactionsByIDs(transactionsIDs){
return transactionIDs
.mergeMap(transactionID => this.getTransactionByID(transactionID));
}
this.transactionsService.getTransactionsByIDs(Observable.from(transactionsIDs))
.subscribe(transactions=>{
....
})
concat. We can use the concat operator to take multiple Observables and return a new Observable that sequentially emits values from each Observable that were passed in. It works by subscribing to them one at a time and merging the results in the output Observable.
Behind the scenes, an observableArray is actually an observable whose value is an array (plus, observableArray adds some additional features described below). So, you can get the underlying JavaScript array by invoking the observableArray as a function with no parameters, just like any other observable.
I think that what you want is combineLatest
. It wont emit any value till all inner observables emit at least one value. After that it will emit the latest from all each time there is a new emit from one of the inner observables.
Here is some reading material: https://www.learnrxjs.io/operators/combination/combinelatest.html
Here is an example:
function getTransactionByID(transactionId) {
let count = 0;
return Rx.Observable.of(transactionId)
.delay(Math.random() * 4000)
.map(x => `${x}: ${count++} `)
.repeat();
}
function getTransactionsByIDs(transactionsIDs){
return Rx.Observable.combineLatest(transactionsIDs.map(transactionID => getTransactionByID(transactionID)));
}
const transactionsIDs = [1,2,3];
getTransactionsByIDs(transactionsIDs)
.take(10)
.subscribe(x => { console.log(x); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.min.js"></script>
The take(10)
is just the keep the example from going on forever.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With