Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cancel an Observable from the producer side, not consumer side

Cancelling from the consumer side, might be called using takeUntil, but that's not necessarily very dynamic. In this case, though, I am looking to cancel an Observable from the producer side of the equation, in the same way you might wish to cancel a Promise inside a promise chain (which is not very possible with the native utility).

Say I have this Observable being returned from a method. (This Queue library is a simple persistent queue that read/writes to a text file, we need to lock read/writes so nothing gets corrupted).

Queue.prototype.readUnique = function () {

    var ret = null;
    var lockAcquired = false;

    return this.obsEnqueue
        .flatMap(() => acquireLock(this))
        .flatMap(() => {
            lockAcquired = true;
            return removeOneLine(this)
        })
        .flatMap(val => {
            ret = val;   // this is not very good
            return releaseLock(this);
        })
        .map(() => {
            return JSON.parse(ret);
        })
        .catch(e => {
            if (lockAcquired) {
                return releaseLock(this);
            }
            else {
                return genericObservable();
            }
        });

};

I have 2 different questions -

  1. If I cannot acquire the lock, how can I "cancel" the observable, to just send back an empty Observable with no result(s)? Would I really have to do if/else logic in each return call to decide whether the current chain is cancelled and if so, return an empty Observable? By empty, I mean an Observable that simple fires onNext/onComplete without any possibility for errors and without any values for onNext. Technically, I don't think that's an empty Observable, so I am looking for what that is really called, if it exists.

  2. If you look at this particular sequence of code:

    .flatMap(() => acquireLock(this))
    .flatMap(() => {
        lockAcquired = true;
        return removeOneLine(this)
    })
    .flatMap(val => {
        ret = val;
        return releaseLock(this);
    })
    .map(() => {
        return JSON.parse(ret);
    })
    

what I am doing is storing a reference to ret at the top of the method and then referencing it again a step later. What I am looking for is a way to pass the value fired from removeOneLine() to JSON.parse(), without having to set some state outside the chain (which is simply inelegant).

like image 255
Alexander Mills Avatar asked Dec 23 '16 20:12

Alexander Mills


People also ask

Can you cancel an Observable?

Use the unsubscribe method A Subscription essentially just has an unsubscribe() function to release resources or cancel Observable executions. To prevent this memory leaks we have to unsubscribe from the subscriptions when we are done with. We do so by calling the unsubscribe method in the Observable.

How do I stop Observable Rxjava?

using() and subscribe to the Observable with a Subscriber so you can call subscriber. unsubscribe() when the user clicks the button to cancel. Here's an outline. This will stop the observable stream and call socketDisposer() to stop the network activity.

Does Observable complete after error?

The observable function calls complete when it has actually finished emitting all the values it had to emit. On the other hand, when an observable errors or completes, the subscriber object calls unsubscribe() , thus freeing the resources used by the subscription and the observable function.

Is subject hot or cold?

The Subject itself is hot/shared.


2 Answers

1) It depends on how your method acquireLock works - but I am assuming that it throws an error if it cannot acquire the lock, in that case you could create your stream with a catch and set the fallback stream to an empty one:

return Rx.Observable.catch(
        removeLine$,
        Rx.Observable.empty()
    );

2) To spare the stateful external variable you could simply chain a mapTo:

let removeLine$ = acquireLock(this)
    .flatMap(() => this.obsEnqueue
        .flatMap(() => removeOneLine(this))
        .flatMap(val => releaseLock(this).mapTo(val))
        .map(val => JSON.parse(val))
        .catch(() => releaseLock(this))
    );
like image 155
olsn Avatar answered Sep 29 '22 06:09

olsn


According to your definition of cancel, it is to prevent an observable from sending a value downstream. To prevent an observable from pushing a value, you can use filter:

It can be as simple as:

observable.filter(_ => lockAcquired)

This will only send a notification downstream if lockAcquired is true.

like image 26
Asti Avatar answered Sep 29 '22 07:09

Asti