Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to clean up RxJava fire and forget subscriptions?

I have fire and forget operations that don't take very long, but long enough that I wanted to run them in the background reactively. There lifetime is basically while the connection to the server exists or 10 seconds has passed. I don't know how to store these so that I can clean them out efficiently while still maintaining the correct life cycle.

Subscription fireAndForget = service.doSomething()
                                    .timeout(10, TimeUnit.SECONDS)
                                    .subscribe( () -> {
                                        otherService.doAction()
                                    }, (e) -> { Log.e("error", e) });
// what do I do with fireAndForget???

// If I use a CompositeSubscription
subscriptions.add(fireAndForget) // how does it get removed????

I could have a CompositeSubscription on my connection that keep holds of them, but then how do I clear them out when the operation is finished? Should I even bother clearing out a CompositeSubscription when the subscription unsubscribes? I am relatively new to Rx so I am not sure if I am just trying to do something Rx wasn't meant to do.

like image 341
David Stocking Avatar asked Jun 09 '16 16:06

David Stocking


2 Answers

Hint: Fire and forget

You are not forgetting if you keep a subscription around. Just call subscribe and move on.

service.doSomething()
    .timeout(10, TimeUnit.SECONDS)
    .subscribe( () -> {
        otherService.doAction()
    }

Edit: Garbage collection.

Unless you do something very strange (like using WeakReference), your executing code in doSomething will prevent the entire chain from being garbage collected.

Think of RxJava like an onion, every time you transform an observable (map, doOnNext, etc.) a new Observable is created that wraps the old observable like an onion.

For each transformation a new Subscriber is created that passes on the callbacks (onNext, onCompleted, onError) to the next Subscriber in the chain.

The main reason for a Subscription is so that you can call unsubscribe. There are two reason you may want to call unsubscribe.

  1. You have a hot observable. Basically this observable will emit values forever. An example could be an observable the emits the time every 30 seconds. You can call unsubscribe when you are no longer interested the time value.

  2. You want to cancel a long operation. Lets say you are loading a webpage from to display to the user, if the user presses back, you want to cancel the loading (you no longer care about the results).

Example Source Code for map operation

Map simply calls lift with an OperatorMap as the argument. Lift creates a new Observable based upon an Operator, it can be ignored for the time being.

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

OperatorMap creates a new Subscriber that mostly just passes calls into whatever Subscriber it is given. This could be the Subscriber you pass into subscribe or a Subscriber created by another map transformation, it does not really matter.

public final class OperatorMap<T, R> implements Operator<R, T> {

    private final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }

        };
    }

}
like image 112
cyroxis Avatar answered Sep 22 '22 14:09

cyroxis


Forget about to get the subscription unless you want to know if is still subscribe or not. By design Observable will unsubcribe the observer onComplete. Then since the instance is not used anymore, the GC at some point will remove it.

just refactor your code:

    service.doSomething().timeout(10, TimeUnit.SECONDS)
                                .subscribe( () -> {
                                    otherService.doAction()
                                }, (e) -> { Log.e("error", e) });

You can see examples about how subscribe works here.

https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/creating/ObservableSubscription.java

like image 42
paul Avatar answered Sep 21 '22 14:09

paul