Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to store accumulated result of a scan with rxjs

I have two merged observables with a scan after the merge. The first one is a simple range and the other is a Subject. Whenever the Subject emits a new value with onNext I concatenate that value in the scan and return the new array as the accumulator. If I dispose of my subscription, and then subscribe again it replays the values from the range but I have lost the ones from the Subject. In the code below I want my second subscription to have a final value of [1, 2, 3, 4, 5]

What would be the best way to do this? Right now I have another Subject where I store that final value and subscribe to that, but it feels wrong.

Here's a simple version that demonstrates what is happening:

var Rx = require('rx');

var source = Rx.Observable.range(1, 3);

var adder = new Rx.Subject();

var merged = source.merge(adder)
                    .scan([], function(accum, x) {
                        return accum.concat(x);
                    });

var subscription1 = merged.subscribe(function(x) {console.log(x)});
adder.onNext(4);
adder.onNext(5);

subscription1.dispose();

console.log('After Disposal');

var subscription2 = merged.subscribe(function(x) {console.log(x)});

This outputs:

[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
[ 1, 2, 3, 4 ]
[ 1, 2, 3, 4, 5 ]
After Disposal
[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
like image 960
gabo Avatar asked Jan 09 '23 06:01

gabo


1 Answers

A Subject is a hot Observable, that's why the second subscription won't see events coming from the Subject. The range Observable is cold, so each "execution instance" is entirely owned by each subscription. On the other hand, a Subject's "execution instance" is singleton and independent, hence the second subscription doesn't see the events.

There are a couple of ways of solving this.

  1. Use a ReplaySubject. You would have to specify the buffer size. If you don't have a good limit for that buffer, using an unlimited buffer might cause memory problems.
  2. Avoid using Subject. In other words, avoid using a hot Observable, replace it with a cold Observable, and according to the problem description I gave in the beginning, your subscriptions wouldn't have the problem and they would see the same events. Normally a Subject can be replaced by an Observable, but it depends on your overall architecture. Might need to rewrite a lot. And in the worst case, such as a circular dependency of Observables, you cannot avoid using a Subject.
  3. Rearrange code such that subscriptions start before the Subject starts emitting, so all subscriptions get a chance to see "live" emissions from the hot Observables.

However, if my interpretation of this problem is correct, you only need the last event emitted by merged, so you could use a variant of alternative (1) where you replay only the last event. That would be a matter of adding .shareReplay(1) to merged, which will make it a hot replayed Observable.

like image 168
André Staltz Avatar answered Mar 10 '23 06:03

André Staltz