I'm creating an array of asynchronous observables with Rx.Observable.create()
and hope to use .toArray()
to get all the values when they complete.
console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
return Rx.Observable.create((obs)=>{
let tid = setTimeout(()=>{
console.log(val + ' timing out');
obs.onNext(val);
},i*500);
return ()=>{
clearTimeout(tid);
};
}).publish().refCount();
});
Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
console.log("arr should be ['a','b','c']",arr);
});
Above example at http://jsbin.com/wegoha/10/edit?js,console.
Using setTimeout
as a stand-in for other asynchronous operations to keep the example simple.
You can use concatAll() or mergeAll() without any parameter. This (including mergeMap ) works only in RxJS 5+ because it treats Observables, arrays, array-like objects, Promises, etc. the same way.
RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by Array#extras (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.
Flattening operators come to our rescue when we have a nested subscription i.e subscribing to an observable within another subscription. This can be pretty annoying to track and debug. Its similar to “Callback hell” scenario where we have nested callbacks.
Operatorslink Pipes let you combine multiple functions into a single function. The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence.
The code is correct except you didn't complete the source observables.
The toArray()
operator can only work when the observable completes, and since you didn't complete the Rx.Observable.create
then your query could never end.
Try this:
console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
return Rx.Observable.create((obs)=>{
let tid = setTimeout(()=>{
console.log(val + ' timing out');
obs.onNext(val);
obs.onCompleted();
},i*500);
return ()=>{
clearTimeout(tid);
};
}).publish().refCount();
});
Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
console.log("arr should be ['a','b','c']",arr);
});
Also, just as a side-note, the .publish().refCount()
seems wrong here. There's no need in this code to make the source observables hot.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With