Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Shared observable and startWith operator

Tags:

rxjs

rxjs5

I have a question regarding multicasted observables and an unexpected (for me) behaviour I noticed.

const a = Observable.fromEvent(someDom, 'click')
  .map(e => 1)
  .startWith(-1)
  .share();

const b = a.pairwise();

a.subscribe(a => {
  console.log(`Sub 1: ${a}`);
});

a.subscribe(a => {
  console.log(`Sub 2: ${a}`)
});

b.subscribe(([prevA, curA]) => {
  console.log(`Pairwise Sub: (${prevA}, ${curA})`);
});

So, there is a shared observable a, which emits 1 on every click event. -1 is emitted due to the startWith operator. The observable b just creates a new observable by pairing up latest two values from a.

My expectation was:

[-1, 1] // first click
[ 1, 1] // all other clicks

What I observed was:

[1, 1] // from second click on, and all other clicks

What I noticed is that the value -1 is emitted immediately and consumed by Sub 1, before even Sub 2 is subscribed to the observable and since a is multicasted, Sub 2 is too late for the party.

Now, I know that I could multicast via BehaviourSubject and not use the startWith operator, but I want to understand the use case of this scenario when I use startWith and multicast via share.

As far as I understand, whenever I use .share() and .startWith(x), only one subscriber will be notified about the startWith value, since all other subscribers are subscribed after emitting the value.

So is this a reason to multicast via some special subject (Behavior/Replay...) or am I missing something about this startWith/share scenario?

Thanks!

like image 869
dinony Avatar asked Feb 03 '17 10:02

dinony


People also ask

What does StartWith do in RXJS?

RxNet StartWith Rx.NET implements this operator as StartWith . It accepts an array of items which it prepends to the resulting Observable sequence in the order they appear in the array before it emits the items from the source Observable.

What is StartWith in angular?

startWith(... values: D[]): OperatorFunction<T, T | D> Returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.


1 Answers

This is actually correct behavior.

The .startWith() emits its value to every new subscriber, not only the first one. The reason why b.subscribe(([prevA, curA]) never receives it is because you're using multicasting with .share() (aka .publish().refCount()).

This means that the first a.subscribe(...) makes the .refCount() to subscribe to its source and it'll stay subscribed (note that Observable .fromEvent(someDom, 'click') never completes).

Then when you finally call b.subscribe(...) it'll subscribe only to the Subject inside .share() and will never go through .startWith(-1) because it's multicasted and already subscribed in .share().

like image 127
martin Avatar answered Sep 21 '22 15:09

martin