I am trying to learn rxjava inside Android environment. Let's say I have an observable that emits the result of a network call. If I understood correctly, a widely common approach to deal with config changes is to:
store the observable in a retained fragment / singleton / application object
apply the cache operator to the observable
subscribe / unsubscribe in the proper lifecycle handlers
Doing this, we would not loose the result of the observable which will re-observerd once the new configuration took place.
Now, my question is:
Is there a way to force the observable to emit a new value (and invalidate the cached one)? Do I need to create a new observable every time I want fresh data from the network (which does not sound like a bad practice in android world because would make the gc do extra work)?
Thanks a lot,
Federico
Make a custom OnSubscribe
implementation that does what you want:
public static class OnSubscribeRefreshingCache<T> implements OnSubscribe<T> {
private final AtomicBoolean refresh = new AtomicBoolean(true);
private final Observable<T> source;
private volatile Observable<T> current;
public OnSubscribeRefreshingCache(Observable<T> source) {
this.source = source;
this.current = source;
}
public void reset() {
refresh.set(true);
}
@Override
public void call(Subscriber<? super T> subscriber) {
if (refresh.compareAndSet(true, false)) {
current = source.cache();
}
current.unsafeSubscribe(subscriber);
}
}
This bit of code demonstrates usage and shows that cache is essentially being reset:
Observable<Integer> o = Observable.just(1)
.doOnCompleted(() -> System.out.println("completed"));
OnSubscribeRefreshingCache<Integer> cacher =
new OnSubscribeRefreshingCache<Integer>(o);
Observable<Integer> o2 = Observable.create(cacher);
o2.subscribe(System.out::println);
o2.subscribe(System.out::println);
cacher.reset();
o2.subscribe(System.out::println);
Output:
completed
1
1
completed
1
By the way you may notice that .cache
doesn't emit till completion. This is a bug that should be fixed by rxjava 1.0.14.
In terms of your GC pressure concerns, every operator when applied to an Observable creates a new Observable usually via lift
or create
. The base member state associated with creating a new Observable is the reference to the onSubscribe
function. cache
is different from most in that it holds state across subscriptions and this holds potential for GC pressure if it holds a lot of state and is thrown away frequently. Even if you used the same mutable data structure to hold the state across resets GC would still have to deal with the contents of the data structure when cleared so you might not gain much.
The RxJava cache
operator is built for multiple concurrent subscriptions. You can probably imagine a reset functionality could prove problematic to implement. By all means raise an issue on RxJava github if you want to explore further.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With