Cancelling from the consumer side, might be called using takeUntil, but that's not necessarily very dynamic. In this case, though, I am looking to cancel an Observable from the producer side of the equation, in the same way you might wish to cancel a Promise inside a promise chain (which is not very possible with the native utility).
Say I have this Observable being returned from a method. (This Queue library is a simple persistent queue that read/writes to a text file, we need to lock read/writes so nothing gets corrupted).
Queue.prototype.readUnique = function () {
var ret = null;
var lockAcquired = false;
return this.obsEnqueue
.flatMap(() => acquireLock(this))
.flatMap(() => {
lockAcquired = true;
return removeOneLine(this)
})
.flatMap(val => {
ret = val; // this is not very good
return releaseLock(this);
})
.map(() => {
return JSON.parse(ret);
})
.catch(e => {
if (lockAcquired) {
return releaseLock(this);
}
else {
return genericObservable();
}
});
};
I have 2 different questions -
If I cannot acquire the lock, how can I "cancel" the observable, to just send back an empty Observable with no result(s)? Would I really have to do if/else logic in each return call to decide whether the current chain is cancelled and if so, return an empty Observable? By empty, I mean an Observable that simple fires onNext/onComplete without any possibility for errors and without any values for onNext. Technically, I don't think that's an empty Observable, so I am looking for what that is really called, if it exists.
If you look at this particular sequence of code:
.flatMap(() => acquireLock(this))
.flatMap(() => {
lockAcquired = true;
return removeOneLine(this)
})
.flatMap(val => {
ret = val;
return releaseLock(this);
})
.map(() => {
return JSON.parse(ret);
})
what I am doing is storing a reference to ret at the top of the method and then referencing it again a step later. What I am looking for is a way to pass the value fired from removeOneLine() to JSON.parse(), without having to set some state outside the chain (which is simply inelegant).
Use the unsubscribe method A Subscription essentially just has an unsubscribe() function to release resources or cancel Observable executions. To prevent this memory leaks we have to unsubscribe from the subscriptions when we are done with. We do so by calling the unsubscribe method in the Observable.
using() and subscribe to the Observable with a Subscriber so you can call subscriber. unsubscribe() when the user clicks the button to cancel. Here's an outline. This will stop the observable stream and call socketDisposer() to stop the network activity.
The observable function calls complete when it has actually finished emitting all the values it had to emit. On the other hand, when an observable errors or completes, the subscriber object calls unsubscribe() , thus freeing the resources used by the subscription and the observable function.
The Subject itself is hot/shared.
1) It depends on how your method acquireLock
works - but I am assuming that it throws an error if it cannot acquire the lock, in that case you could create your stream with a catch
and set the fallback stream to an empty one:
return Rx.Observable.catch(
removeLine$,
Rx.Observable.empty()
);
2) To spare the stateful external variable you could simply chain a mapTo
:
let removeLine$ = acquireLock(this)
.flatMap(() => this.obsEnqueue
.flatMap(() => removeOneLine(this))
.flatMap(val => releaseLock(this).mapTo(val))
.map(val => JSON.parse(val))
.catch(() => releaseLock(this))
);
According to your definition of cancel, it is to prevent an observable from sending a value downstream. To prevent an observable from pushing a value, you can use filter:
It can be as simple as:
observable.filter(_ => lockAcquired)
This will only send a notification downstream if lockAcquired
is true.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With