I have fire and forget operations that don't take very long, but long enough that I wanted to run them in the background reactively. There lifetime is basically while the connection to the server exists or 10 seconds has passed. I don't know how to store these so that I can clean them out efficiently while still maintaining the correct life cycle.
Subscription fireAndForget = service.doSomething()
.timeout(10, TimeUnit.SECONDS)
.subscribe( () -> {
otherService.doAction()
}, (e) -> { Log.e("error", e) });
// what do I do with fireAndForget???
// If I use a CompositeSubscription
subscriptions.add(fireAndForget) // how does it get removed????
I could have a CompositeSubscription on my connection that keep holds of them, but then how do I clear them out when the operation is finished? Should I even bother clearing out a CompositeSubscription when the subscription unsubscribes? I am relatively new to Rx so I am not sure if I am just trying to do something Rx wasn't meant to do.
Hint: Fire and forget
You are not forgetting if you keep a subscription around. Just call subscribe and move on.
service.doSomething()
.timeout(10, TimeUnit.SECONDS)
.subscribe( () -> {
otherService.doAction()
}
Edit: Garbage collection.
Unless you do something very strange (like using WeakReference), your executing code in doSomething
will prevent the entire chain from being garbage collected.
Think of RxJava like an onion, every time you transform an observable (map, doOnNext, etc.) a new Observable is created that wraps the old observable like an onion.
For each transformation a new Subscriber
is created that passes on the callbacks (onNext, onCompleted, onError) to the next Subscriber
in the chain.
The main reason for a Subscription
is so that you can call unsubscribe
. There are two reason you may want to call unsubscribe.
You have a hot observable. Basically this observable will emit values forever. An example could be an observable the emits the time every 30 seconds. You can call unsubscribe
when you are no longer interested the time value.
You want to cancel a long operation. Lets say you are loading a webpage from to display to the user, if the user presses back, you want to cancel the loading (you no longer care about the results).
Example Source Code for map operation
Map simply calls lift
with an OperatorMap
as the argument. Lift creates a new Observable
based upon an Operator
, it can be ignored for the time being.
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
OperatorMap
creates a new Subscriber
that mostly just passes calls into whatever Subscriber
it is given. This could be the Subscriber
you pass into subscribe
or a Subscriber
created by another map
transformation, it does not really matter.
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
}
Forget about to get the subscription unless you want to know if is still subscribe or not. By design Observable will unsubcribe the observer onComplete. Then since the instance is not used anymore, the GC at some point will remove it.
just refactor your code:
service.doSomething().timeout(10, TimeUnit.SECONDS)
.subscribe( () -> {
otherService.doAction()
}, (e) -> { Log.e("error", e) });
You can see examples about how subscribe works here.
https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/creating/ObservableSubscription.java
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With