Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to collect array of emitted values from Observable.from?

So in Rxjs, I have bunch of code,

return Observable.from(input_array)
           .concatMap((item)=>{
               //this part emits an Observable.of<string> for each item in the input_array
           })
           .scan((output_array:string[],each_item_output_array:string)=>{
               return output_array.push(each_item_output_array) ;
           });

But apparently this is wrong, the scan will break the code inside the concatMap, so I want to know how to collect the output array of each item in the observable from operator?

like image 889
tomriddle_1234 Avatar asked Feb 18 '17 06:02

tomriddle_1234


People also ask

How do you get the last emitted value from Observable?

BehaviorSubject is the answer here. Rather than a standard subject (Observable) that just emits values as they come in, a BehaviorSubject emits the last value upon subscribe() . You can also get the last value manually using the BehaviorSubjects getValue() method.

How do you emit multiple values in Observable?

Multiple results While a Promise only emits the result once, Observables can emit multiple values over time. In the above example the Observable emits the values 0,1,2,3,4 delayed by one second and then completes. The subscribe method is called five times. Besides multiple values, we can also detect the end of values.

How do you declare an Observable array?

Subscribing to an observable always return the value of the observable after it emits it. If you want to declare the variable as the observable itself, you simply assign it as the call: this. listeIsolements$ = this.


3 Answers

In your call to scan you have not specified a seed for the accumulator. In that circumstance, the first value is used as a seed. For example:

Rx.Observable
  .from(["a", "b", "c"])
  .scan((acc, value) => acc + value)
  .subscribe(value => console.log(value));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

In your snippet, the first value is not an array, so you cannot call push on it. To accumulate the values into an array, you can specify an array seed like this:

Rx.Observable
  .from(["a", "b", "c"])
  .concatMap(value => Rx.Observable.of(value))
  .scan((acc, value) => {
    acc.push(value);
    return acc;
  }, []) // Note that an empty array is use as the seed
  .subscribe(value => console.log(JSON.stringify(value)));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

Although, for some use cases, it would be preferable to not mutate the array:

Rx.Observable
  .from(["a", "b", "c"])
  .concatMap(value => Rx.Observable.of(value))
  .scan((acc, value) => [...acc, value], [])
  .subscribe(value => console.log(JSON.stringify(value)));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

Note that scan emits an array for each value that it receives. If you only want a single array emitted when the observable completes, you can use the toArray operator instead:

Rx.Observable
  .from(["a", "b", "c"])
  .concatMap(value => Rx.Observable.of(value))
  .toArray()
  .subscribe(value => console.log(JSON.stringify(value)));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
like image 91
cartant Avatar answered Oct 20 '22 08:10

cartant


Be carefull with this code:

      const obs = Rx.Observable
      .from(["a", "b", "c"])
      .concatMap(value => Rx.Observable.of(value))
      .scan((acc, value) => {
        acc.push(value);
        return acc;
      }, []); 
      obs.subscribe(value => console.log(JSON.stringify(value)));
      obs.subscribe(value => console.log(JSON.stringify(value)));

Result will be a bit unexpected:

 ["a"]
 ["a","b"]
 ["a","b","c"]
 ["a","b","c","a"]
 ["a","b","c","a","b"]
 ["a","b","c","a","b","c"]

"acc" variable is reference object and each subscriber gets stream data and adds data to the same object again. It might be a lot of solutions for avoiding it, this is creation new object when stream data is received again :

    var obs = Rx.Observable
          .from(["a", "b", "c"])
          .concatMap(value => Rx.Observable.of(value))
          .scan((acc, value) => {
          //clone initial value
            if (acc.length == 0) {

                   acc = [];
                }
            acc.push(value);
            return acc 
          }, []); // Note that an empty array is use as the seed
          obs.subscribe(value => console.log(JSON.stringify(value)));
          obs.subscribe(value => console.log(JSON.stringify(value)));

result as expected:

 ["a"]
 ["a","b"]
 ["a","b","c"]
 ["a"]
 ["a","b"]
 ["a","b","c"]

I hope it saves a lot time for someone

like image 30
Oleg Bondarenko Avatar answered Oct 20 '22 07:10

Oleg Bondarenko


Another option is bufferCount(count) if you know the length of the input array you can get a single output containing that number of items. Cleaner syntax than having to remember how to use scan.

Note: If you don't know the size (although in your example you would) then count represents a maximum - but this may have resource constraints so don't just make it 99999.

 const array = source$.pipe(bufferCount(10));

In my case I had a list of 'operations' being executed and I knew the total number of steps in advance, so bufferCount worked quite well. However be sure to carefully consider error conditions

 // one approach to handling errors that still returns an output array
 // only use this if a single failure shouldn't stop the collection
 const array = source$.pipe(
                    map((result) => ({ hasError: false, result: result }),
                    catchError((err) => ({ hasError: true, error: err }),
                    bufferCount(10));

The decision on how to handle errors will vary greatly based on what your observables actually are - but the point here is to show bufferCount() as an option.

(There are other buffer operations available too)

like image 1
Simon_Weaver Avatar answered Oct 20 '22 06:10

Simon_Weaver