Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to prevent AsyncSubject from completing when the last observer unsubscribes

The AsyncSubject becomes observable when the last subject observer unsubscribes from the subject. Here is the quote:

When it’s done, it’s done. Subjects cannot be reused after they’re unsubscribed, completed or errored.

Here is the demo:

const ofObservable = Rx.Observable.of(1, 2, 3);
const subject = new Rx.AsyncSubject();

ofObservable.subscribe(subject);

subject.subscribe((v) => {
    console.log(v);
});

subject.unsubscribe((v) => {
    console.log(v);
});

// here I'll get the error "object unsubscribed"
subject.subscribe((v) => {
    console.log(v);
});

How to prevent the subject from completing?

There's a share operator:

In RxJS 5, the operator share() makes a hot, refCounted observable that can be retried on failure, or repeated on success. Because subjects cannot be reused once they’ve errored, completed or otherwise unsubscribed, the share() operator will recycle dead subjects to enable resubscription to the resulting observable.

That is what I'm looking for. But share creates a subject and I need AsyncSubject.

like image 271
Max Koretskyi Avatar asked Dec 11 '22 10:12

Max Koretskyi


1 Answers

The problem is down to this line:

subject.unsubscribe((v) => {
    console.log(v);
});

Subject implements ISubscription; which means it has an unsubscribe method and a closed property. Its implementation of unsubscribe is as follows:

unsubscribe() {
  this.isStopped = true;
  this.closed = true;
  this.observers = null;
}

Which is somewhat brutal. Essentially, it severs all communication with any subscribers to the subject without unsubscribing them. Similarly, it does not unsubscribe the subject itself from any observable it might happen to be subscribed to. (It also marks the subject as closed/stopped, which was the reason for your error.)

Given that it doesn't acutally perform any unsubscriptions, how it's supposed to be used is unclear. The description of this test:

it('should disallow new subscriber once subject has been disposed', () => {

suggests that it might be some sort of hangover from RxJS 4 - in which unsubscription was termed disposal.

Whatever the reason for its being, I would recommend never calling it. By way of example, look at this snippet:

const source = Rx.Observable
  .interval(200)
  .take(5)
  .do(value => console.log(`source: ${value}`));

const subject = new Rx.Subject();
source.subscribe(subject);

const subscription = subject
  .switchMap(() => Rx.Observable
    .interval(200)
    .take(5)
    .delay(500))
  .subscribe(value => console.log(`subscription: ${value}`));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

It subscribes a subject to a source observable and then subscribes to an observable composed from the subject.

If unsubscribe is called on the subject, a couple of problems become evident:

  • the subject's subscription to the source is not unsubscribed and an error is effected when the source attempts to call the subject's next method; and
  • the subscription to the observable composed from the subject is not unsubscribed, so the interval observable within the switchMap keeps emitting after the unsubscribe call is made.

Try it out:

const source = Rx.Observable
  .interval(200)
  .take(5)
  .do(value => console.log(`source: ${value}`));

const subject = new Rx.Subject();
source.subscribe(subject);

const subscription = subject
  .switchMap(() => Rx.Observable
    .interval(200)
    .take(5)
    .delay(500))
  .subscribe(value => console.log(`subscription: ${value}`));

setTimeout(() => {
  console.log("subject.unsubscribe()");
  subject.unsubscribe();
}, 700);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

None of this seems like desirable behaviour, so calling unsubscribe on a Subject is something to be avoided.

Instead, the code in your snippet should unsubscribe using the Subscription returned by the subscribe call:

const subscription = subject.subscribe((v) => {
  console.log(v);
});
subscription.unsubscribe();

Subsequent to writing this answer, I've found the following comment from Ben Lesh, that fits with my theory that it's related to the disposal of the subject:

If you want the subject to loudly and angrily error when you next to it after it’s done being useful, you can call unsubscribe directly on the subject instance itself.

like image 189
cartant Avatar answered May 06 '23 05:05

cartant