I'm using ReactiveX/RxJS version.
Lets say that I have a Rx.ReplaySubject that every 2 seconds it emits an object that contains an id and an array with values. I want to reduce this array of values and get the sum of them all.
The problem is that ReplaySubject is a hot observable and it never completes, at least I don't want it to complete because I want the sum of that object values every 2 seconds. But in order to use the reduce operator the observable should be completed. So, how should I proceed ?
E.G not working code:
var subject = new Rx.ReplaySubject();
subject.
map(x => x.transactions).
// Reduce never concludes because ReplaySubject instance is not completed
reduce((v1, v2) => v1+v2, 0).
subscribe(function (value) {
console.log(value)
});
setInterval(injectData, 2000);
function injectData () {
subject.next({id: Date.now(), transactions: [
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)}
]});
}
reduce(), reduce applies an accumulator function against an accumulation and each value of the source Observable (from the past) to reduce it to a single value, emitted on the output Observable. Note that reduce will only emit one value, only when the source Observable completes.
ReplaySubject is a variant of a Subject which keeps a cache of previous values emitted by a source observable and sends them to all new observers immediately on subscription. This behavior of replaying a sequence of old values to new subscribes is where the name for this type of a subject comes from.
ReplaySubject replays old values to new subscribers when they first subscribe. The ReplaySubject will store every value it emits in a buffer. It will emit them to the new subscribers in the order it received them. You can configure the buffer using the arguments bufferSize and windowTime.
Consider using Observable.prototype.scan()
(RxJS documentation)
scan()
bascially aggregates an observable and emits each successive value, unlike reduce()
which only emits the result upon completion. (see Rx explanation of scan and reduce)
Example using OP's code (Here's the fiddle):
var subject = new Rx.ReplaySubject();
subject
// note: use "selectMany" to flatten observable of observables
.selectMany(x => Rx.Observable.fromArray(x.transactions))
// note: use "scan" to aggregate values
.scan((agg, val) => agg+val.value, 0)
.subscribe(function (value) {
console.log(value)
});
setInterval(injectData, 2000);
function injectData () {
subject.onNext({id: Date.now(), transactions: [
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)}
]});
}
Another example:
The above code emits the aggregate for every transaction because of the selectMany()
. If you only wanted it to emit once every 2 seconds, this is a great time to use reduce()
like so (Here's another fiddle):
subject
// note: use "selectMany" to flatten observable of observables
// note: using "reduce" inside here so that we only emit the aggregate
.selectMany(x =>
Rx.Observable
.fromArray(x.transactions)
.reduce((agg, val) => agg + val.value, 0)
)
// note: use "scan" to aggregate values
.scan((agg, val) => agg+val, 0)
.subscribe(function (value) {
console.log(value)
});
Additional note:
Rx Subjects can complete; you just need to call onCompleted()
when you're ready. If you make your subject complete you can still use reduce()
. Compare this fiddle with the one above.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With