The AsyncSubject
becomes observable when the last subject observer unsubscribes from the subject. Here is the quote:
When it’s done, it’s done. Subjects cannot be reused after they’re unsubscribed, completed or errored.
Here is the demo:
const ofObservable = Rx.Observable.of(1, 2, 3);
const subject = new Rx.AsyncSubject();
ofObservable.subscribe(subject);
subject.subscribe((v) => {
console.log(v);
});
subject.unsubscribe((v) => {
console.log(v);
});
// here I'll get the error "object unsubscribed"
subject.subscribe((v) => {
console.log(v);
});
How to prevent the subject from completing?
There's a share
operator:
In RxJS 5, the operator
share()
makes a hot, refCounted observable that can be retried on failure, or repeated on success. Because subjects cannot be reused once they’ve errored, completed or otherwise unsubscribed, theshare()
operator will recycle dead subjects to enable resubscription to the resulting observable.
That is what I'm looking for. But share
creates a subject and I need AsyncSubject
.
The problem is down to this line:
subject.unsubscribe((v) => {
console.log(v);
});
Subject
implements ISubscription
; which means it has an unsubscribe
method and a closed
property. Its implementation of unsubscribe
is as follows:
unsubscribe() {
this.isStopped = true;
this.closed = true;
this.observers = null;
}
Which is somewhat brutal. Essentially, it severs all communication with any subscribers to the subject without unsubscribing them. Similarly, it does not unsubscribe the subject itself from any observable it might happen to be subscribed to. (It also marks the subject as closed/stopped, which was the reason for your error.)
Given that it doesn't acutally perform any unsubscriptions, how it's supposed to be used is unclear. The description of this test:
it('should disallow new subscriber once subject has been disposed', () => {
suggests that it might be some sort of hangover from RxJS 4 - in which unsubscription was termed disposal.
Whatever the reason for its being, I would recommend never calling it. By way of example, look at this snippet:
const source = Rx.Observable
.interval(200)
.take(5)
.do(value => console.log(`source: ${value}`));
const subject = new Rx.Subject();
source.subscribe(subject);
const subscription = subject
.switchMap(() => Rx.Observable
.interval(200)
.take(5)
.delay(500))
.subscribe(value => console.log(`subscription: ${value}`));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
It subscribes a subject to a source observable and then subscribes to an observable composed from the subject.
If unsubscribe
is called on the subject, a couple of problems become evident:
next
method; andinterval
observable within the switchMap
keeps emitting after the unsubscribe
call is made.Try it out:
const source = Rx.Observable
.interval(200)
.take(5)
.do(value => console.log(`source: ${value}`));
const subject = new Rx.Subject();
source.subscribe(subject);
const subscription = subject
.switchMap(() => Rx.Observable
.interval(200)
.take(5)
.delay(500))
.subscribe(value => console.log(`subscription: ${value}`));
setTimeout(() => {
console.log("subject.unsubscribe()");
subject.unsubscribe();
}, 700);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
None of this seems like desirable behaviour, so calling unsubscribe
on a Subject
is something to be avoided.
Instead, the code in your snippet should unsubscribe using the Subscription
returned by the subscribe
call:
const subscription = subject.subscribe((v) => {
console.log(v);
});
subscription.unsubscribe();
Subsequent to writing this answer, I've found the following comment from Ben Lesh, that fits with my theory that it's related to the disposal of the subject:
If you want the subject to loudly and angrily error when you
next
to it after it’s done being useful, you can callunsubscribe
directly on the subject instance itself.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With