Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS - How to use toArray() with an array of asynchronous observables?

I'm creating an array of asynchronous observables with Rx.Observable.create() and hope to use .toArray() to get all the values when they complete.

console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
  return Rx.Observable.create((obs)=>{
    let tid = setTimeout(()=>{
      console.log(val + ' timing out');
      obs.onNext(val);
    },i*500);
    return ()=>{
      clearTimeout(tid);
    };
  }).publish().refCount();
});

Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
  console.log("arr should be ['a','b','c']",arr);
});

Above example at http://jsbin.com/wegoha/10/edit?js,console.

Using setTimeout as a stand-in for other asynchronous operations to keep the example simple.

like image 346
Adam Avatar asked Feb 19 '16 03:02

Adam


People also ask

How do you flatten an Observable array?

You can use concatAll() or mergeAll() without any parameter. This (including mergeMap ) works only in RxJS 5+ because it treats Observables, arrays, array-like objects, Promises, etc. the same way.

Is RxJS asynchronous?

RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by Array#extras (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.

What is flattening operator in RxJS?

Flattening operators come to our rescue when we have a nested subscription i.e subscribing to an observable within another subscription. This can be pretty annoying to track and debug. Its similar to “Callback hell” scenario where we have nested callbacks.

What is pipe operator in RxJS?

Operatorslink Pipes let you combine multiple functions into a single function. The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence.


1 Answers

The code is correct except you didn't complete the source observables.

The toArray() operator can only work when the observable completes, and since you didn't complete the Rx.Observable.create then your query could never end.

Try this:

console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
  return Rx.Observable.create((obs)=>{
    let tid = setTimeout(()=>{
      console.log(val + ' timing out');
      obs.onNext(val);
      obs.onCompleted();
    },i*500);
    return ()=>{
      clearTimeout(tid);
    };
  }).publish().refCount();
});

Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
  console.log("arr should be ['a','b','c']",arr);
});

Also, just as a side-note, the .publish().refCount() seems wrong here. There's no need in this code to make the source observables hot.

like image 96
Enigmativity Avatar answered Sep 25 '22 13:09

Enigmativity