What is the cleanest way to perform a side-effect when the last subscription for a RxJS Observable is disposed? This may occur before the Observable has terminated.
Let's say that I need a function returning an Observable
that emits changes to a resource. I'd like to perform a cleanup action when all subscriptions have been disposed.
var observable = streamResourceChanges(resource);
var subscription1 = observable.subscribe(observer1);
var subscription2 = observable.subscribe(observer2);
// ...
subscription1.dispose(); // Does not perform the cleanup
subscription2.dispose(); // Performs the cleanup
The only way I've found to define a subscription disposal action is to use Rx.Observable.create
. The last disposal can be handled by sharing a subscription, for example with Observable.prototype.singleInstance()
.
For example:
function streamResourceChanges(resource) {
return Rx.Observable.create(function(observer) {
// Subscribe the observer for resource changes...
// Return a cleanup function
return function() {
// Perform cleanup here...
console.log("Cleanup performed!");
};
}).singleInstance();
}
Is there a neater way to define a side-effect for subscription disposal, similar to doOnNext
, doOnCompleted
or doOnError
?
var withCleanup = withoutCleanup.doOnDispose(function() {
// Perform cleanup here...
});
Specifically, we must unsubscribe before Angular destroys the component. Failure to do so could create a memory leak.
This can be achieved by using async pipes | , or any other RxJS operators like take and first that will automatically unsubscribe your observable under the hood. But if you have no other choice than to use unsubscribe , like in the example we have above. Then go for the takeUntil with ReplaySubject approach.
Note that when an observable emits a complete event, subscriptions will automatically be unsubscribed, and that certain frameworks using RxJS may also handle unsubscriptions automatically in certain cases.
Two choices come to mind depending upon your actual use case:
.finally()
source.finally(() => console.log("cleaning up")).singleInstance()
.using()
Rx.Observable
.using(
// allocate some disposable resource during subscribe.
// resource.dispose() will be called during unsubscribe.
() => new SomeResource(),
// use the disposable resource to create your observable
// for example...
resource => Rx.Observable.interval(resource.time))
.singleInstance();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With