So in Rxjs, I have bunch of code,
return Observable.from(input_array)
.concatMap((item)=>{
//this part emits an Observable.of<string> for each item in the input_array
})
.scan((output_array:string[],each_item_output_array:string)=>{
return output_array.push(each_item_output_array) ;
});
But apparently this is wrong, the scan will break the code inside the concatMap, so I want to know how to collect the output array of each item in the observable from
operator?
BehaviorSubject is the answer here. Rather than a standard subject (Observable) that just emits values as they come in, a BehaviorSubject emits the last value upon subscribe() . You can also get the last value manually using the BehaviorSubjects getValue() method.
Multiple results While a Promise only emits the result once, Observables can emit multiple values over time. In the above example the Observable emits the values 0,1,2,3,4 delayed by one second and then completes. The subscribe method is called five times. Besides multiple values, we can also detect the end of values.
Subscribing to an observable always return the value of the observable after it emits it. If you want to declare the variable as the observable itself, you simply assign it as the call: this. listeIsolements$ = this.
In your call to scan
you have not specified a seed for the accumulator. In that circumstance, the first value is used as a seed. For example:
Rx.Observable
.from(["a", "b", "c"])
.scan((acc, value) => acc + value)
.subscribe(value => console.log(value));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
In your snippet, the first value is not an array, so you cannot call push
on it. To accumulate the values into an array, you can specify an array seed like this:
Rx.Observable
.from(["a", "b", "c"])
.concatMap(value => Rx.Observable.of(value))
.scan((acc, value) => {
acc.push(value);
return acc;
}, []) // Note that an empty array is use as the seed
.subscribe(value => console.log(JSON.stringify(value)));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Although, for some use cases, it would be preferable to not mutate the array:
Rx.Observable
.from(["a", "b", "c"])
.concatMap(value => Rx.Observable.of(value))
.scan((acc, value) => [...acc, value], [])
.subscribe(value => console.log(JSON.stringify(value)));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Note that scan
emits an array for each value that it receives. If you only want a single array emitted when the observable completes, you can use the toArray
operator instead:
Rx.Observable
.from(["a", "b", "c"])
.concatMap(value => Rx.Observable.of(value))
.toArray()
.subscribe(value => console.log(JSON.stringify(value)));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Be carefull with this code:
const obs = Rx.Observable
.from(["a", "b", "c"])
.concatMap(value => Rx.Observable.of(value))
.scan((acc, value) => {
acc.push(value);
return acc;
}, []);
obs.subscribe(value => console.log(JSON.stringify(value)));
obs.subscribe(value => console.log(JSON.stringify(value)));
Result will be a bit unexpected:
["a"]
["a","b"]
["a","b","c"]
["a","b","c","a"]
["a","b","c","a","b"]
["a","b","c","a","b","c"]
"acc" variable is reference object and each subscriber gets stream data and adds data to the same object again. It might be a lot of solutions for avoiding it, this is creation new object when stream data is received again :
var obs = Rx.Observable
.from(["a", "b", "c"])
.concatMap(value => Rx.Observable.of(value))
.scan((acc, value) => {
//clone initial value
if (acc.length == 0) {
acc = [];
}
acc.push(value);
return acc
}, []); // Note that an empty array is use as the seed
obs.subscribe(value => console.log(JSON.stringify(value)));
obs.subscribe(value => console.log(JSON.stringify(value)));
result as expected:
["a"]
["a","b"]
["a","b","c"]
["a"]
["a","b"]
["a","b","c"]
I hope it saves a lot time for someone
Another option is bufferCount(count)
if you know the length of the input array you can get a single output containing that number of items. Cleaner syntax than having to remember how to use scan
.
Note: If you don't know the size (although in your example you would) then count
represents a maximum - but this may have resource constraints so don't just make it 99999.
const array = source$.pipe(bufferCount(10));
In my case I had a list of 'operations' being executed and I knew the total number of steps in advance, so bufferCount
worked quite well. However be sure to carefully consider error conditions
// one approach to handling errors that still returns an output array
// only use this if a single failure shouldn't stop the collection
const array = source$.pipe(
map((result) => ({ hasError: false, result: result }),
catchError((err) => ({ hasError: true, error: err }),
bufferCount(10));
The decision on how to handle errors will vary greatly based on what your observables actually are - but the point here is to show bufferCount()
as an option.
(There are other buffer
operations available too)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With