Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS reduce a ReplaySubject

I'm using ReactiveX/RxJS version.

Lets say that I have a Rx.ReplaySubject that every 2 seconds it emits an object that contains an id and an array with values. I want to reduce this array of values and get the sum of them all.

The problem is that ReplaySubject is a hot observable and it never completes, at least I don't want it to complete because I want the sum of that object values every 2 seconds. But in order to use the reduce operator the observable should be completed. So, how should I proceed ?

E.G not working code:

var subject = new Rx.ReplaySubject();

subject.
  map(x => x.transactions).
  // Reduce never concludes because ReplaySubject instance is not completed
  reduce((v1, v2) => v1+v2, 0).
  subscribe(function (value) {
    console.log(value)
  });

setInterval(injectData, 2000);

function injectData () {
  subject.next({id: Date.now(), transactions: [
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)}
  ]});
}
like image 272
João Mosmann Avatar asked Mar 18 '16 02:03

João Mosmann


People also ask

What is reduce in RXJS?

reduce(), reduce applies an accumulator function against an accumulation and each value of the source Observable (from the past) to reduce it to a single value, emitted on the output Observable. Note that reduce will only emit one value, only when the source Observable completes.

What is ReplaySubject RXJS?

ReplaySubject is a variant of a Subject which keeps a cache of previous values emitted by a source observable and sends them to all new observers immediately on subscription. This behavior of replaying a sequence of old values to new subscribes is where the name for this type of a subject comes from.

What is a ReplaySubject angular?

ReplaySubject replays old values to new subscribers when they first subscribe. The ReplaySubject will store every value it emits in a buffer. It will emit them to the new subscribers in the order it received them. You can configure the buffer using the arguments bufferSize and windowTime.


1 Answers

Consider using Observable.prototype.scan() (RxJS documentation)

scan() bascially aggregates an observable and emits each successive value, unlike reduce() which only emits the result upon completion. (see Rx explanation of scan and reduce)

Example using OP's code (Here's the fiddle):

var subject = new Rx.ReplaySubject();

subject
  // note: use "selectMany" to flatten observable of observables
  .selectMany(x => Rx.Observable.fromArray(x.transactions))
  // note: use "scan" to aggregate values
  .scan((agg, val) => agg+val.value, 0)
  .subscribe(function (value) {
    console.log(value)
  });

setInterval(injectData, 2000);

function injectData () {
  subject.onNext({id: Date.now(), transactions: [
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)}
  ]});
}

Another example:

The above code emits the aggregate for every transaction because of the selectMany(). If you only wanted it to emit once every 2 seconds, this is a great time to use reduce() like so (Here's another fiddle):

subject
  // note: use "selectMany" to flatten observable of observables
  // note: using "reduce" inside here so that we only emit the aggregate
  .selectMany(x => 
    Rx.Observable
      .fromArray(x.transactions)
      .reduce((agg, val) => agg + val.value, 0)
  )
  // note: use "scan" to aggregate values
  .scan((agg, val) => agg+val, 0)
  .subscribe(function (value) {
    console.log(value)
  });

Additional note:

Rx Subjects can complete; you just need to call onCompleted() when you're ready. If you make your subject complete you can still use reduce(). Compare this fiddle with the one above.

like image 59
souldzin Avatar answered Oct 15 '22 22:10

souldzin