Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

rxjs 5 publishReplay refCount

Tags:

I can't figure out how publishReplay().refCount() works.

For example (https://jsfiddle.net/7o3a45L1/):

var source = Rx.Observable.create(observer =>  {   console.log("call");    // expensive http request   observer.next(5); }).publishReplay().refCount();  subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)}); subscription1.unsubscribe(); console.log("");   subscription2 = source.subscribe({next: (v) => console.log('observerB: ' + v)}); subscription2.unsubscribe(); console.log("");   subscription3 = source.subscribe({next: (v) => console.log('observerC: ' + v)}); subscription3.unsubscribe(); console.log("");   subscription4 = source.subscribe({next: (v) => console.log('observerD: ' + v)}); subscription4.unsubscribe(); 

gives the following result:

call observerA: 5

observerB: 5 call observerB: 5

observerC: 5 observerC: 5 call observerC: 5

observerD: 5 observerD: 5 observerD: 5 call observerD: 5

1) Why are observerB, C and D called multiple times?

2) Why "call" is printed on each line and not in the beginning of the line?

Also, if i call publishReplay(1).refCount(), it calls observerB, C and D 2 times each.

What i expect is that every new observer receives the value 5 exactly once and "call" is printed only once.

like image 732
Oleg Gello Avatar asked Feb 12 '17 15:02

Oleg Gello


People also ask

What is publishReplay in angular?

publishReplay makes it possible to share a single subscription to the underlying stream between multiple subscribers and replay a set of values that happened before the underlying stream completed.

How RXJS share works?

sharelink. Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable.


1 Answers

publishReplay(x).refCount() combined does the following:

  • It create a ReplaySubject which replay up to x emissions. If x is not defined then it replays the complete stream.
  • It makes this ReplaySubject multicast compatible using a refCount() operator. This results in concurrent subscriptions receiving the same emissions.

Your example contains a few issues clouding how it all works together. See the following revised snippet:

var state = 5  var realSource = Rx.Observable.create(observer =>  {    console.log("creating expensive HTTP-based emission");     observer.next(state++);  //  observer.complete();        return () => {      console.log('unsubscribing from source')    }  });      var source = Rx.Observable.of('')    .do(() => console.log('stream subscribed'))    .ignoreElements()    .concat(realSource)  .do(null, null, () => console.log('stream completed'))  .publishReplay()  .refCount()  ;        subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});  subscription1.unsubscribe();     subscription2 = source.subscribe(v => console.log('observerB: ' + v));  subscription2.unsubscribe();        subscription3 = source.subscribe(v => console.log('observerC: ' + v));  subscription3.unsubscribe();        subscription4 = source.subscribe(v => console.log('observerD: ' + v));   
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

When running this snippet we can see clearly that it is not emitting duplicate values for Observer D, it is in fact creating new emissions for every subscription. How come?

Every subscription is unsubscribed before the next subscription takes place. This effectively makes the refCount decrease back to zero, no multicasting is being done.

The issue resides in the fact that the realSource stream does not complete. Because we are not multicasting the next subscriber gets a fresh instance of realSource through the ReplaySubject and the new emissions are prepended with the previous already emitted emissions.

So to fix your stream from invoking the expensive HTTP request multiple times you have to complete the stream so the publishReplay knows it does not need to re-subscribe.

like image 173
Mark van Straten Avatar answered Dec 06 '22 02:12

Mark van Straten