Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS share() operator with BehaviorSubject and async pipe - Angular

I have a BehaviorSubject that is being consumed as an observable:

testForStack$: Observable<boolean>;

ngOnInit(){
    const bs = new BehaviorSubject(true);
    this.testForStack$ = bs
      .asObservable()
      .do(t => console.log('subscribed'))
      .share();
}

This observable is being piped through three async pipes in the template:

Sub1: {{testForStack$ | async}}<br>
Sub2: {{testForStack$ | async}}<br>
Sub3: {{testForStack$ | async}}

The issue is only the first (Sub1) is getting the value of true

Sub1: true
Sub2: 
Sub3:

If I remove the .share(), all three values get the value of true, but this causes the issue of multiple subscriptions.

Any thoughts on why using the BehaviorSubject causes this behavior? It's being used as an observable so I would assume the above code would work correctly.

This is similar to this answer as well.

like image 953
Tony Scialo Avatar asked Oct 10 '18 14:10

Tony Scialo


3 Answers

Don't use the share operator. Instead do something like this:

<ng-container *ngIf="testForStack$ | async as testForStack">

  Sub1: {{ testForStack }}
  Sub2: {{ testForStack }} 
  Sub3: {{ testForStack }}

</ng-container>

There are various other ways, for example you could use the same approach with a template using ngTemplateOutlet if you don't like use of the *ngIf. This approach allows you to create an alias variable in the same manner:

<ng-template
  let-testForStack 
  [ngTemplateOutletContext]="{ $implicit: testForStack$ | async }"
  [ngTemplateOutlet]="selfie" #selfie>

  Sub1: {{ testForStack }}
  Sub2: {{ testForStack }} 
  Sub3: {{ testForStack }}

</ng-template>

This ng-template code is self-referencing (which is valid) and wholly untested but "should" work and avoid the use of *ngIf

Read more here:

https://nitayneeman.com/posts/using-single-subscription-for-multiple-async-pipes-in-angular/

like image 50
danday74 Avatar answered Oct 08 '22 07:10

danday74


This is a correct behavior. The share() operator keeps only one subscription to its parent and BehaviorSubject emits its value only on subscription.

This means that when you use the first {{testForStack$ | async}} it subscribes at the end of the chain to share() which subscribes to its parents which results in subscribing to the source BehaviorSubject that emits its value immediately.

However, the second and all consecutive {{testForStack$ | async}} subscribe to share() which already has subscribed to its parent and won't make any more subscriptions so there's nothing to push the source value to these observers.

An easy solution could be using shareReplay(1) (depending on your RxJS version) you should probably use publishReplay(1).refCount() instead because of these issues (or its pipable equivalents):

  • https://github.com/ReactiveX/rxjs/issues/3336

  • https://github.com/ReactiveX/rxjs/issues/3127

like image 10
martin Avatar answered Oct 08 '22 07:10

martin


Update for RxJS 7 / 2022:

I found this question because I started adding share to pipelines that began with a BehaviorSubject, but included expensive intermediate transforms that were running once for each subscriber.

The pattern I think is going to work for me is

const transformedValues$ = subj.pipe(
  map(expensiveTransform),
  shareReplay({bufferSize: 1, refCount: true})
);

This addresses the comment @martin made about needing to manually include refCount. The transform shouldn't run at all if $transformedValues has no subscribers. On the first subscription, the transform should run once; subsequent subscribers should get a replay from the shareReplay operator.

like image 2
Coderer Avatar answered Oct 08 '22 07:10

Coderer