Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava Observable.cache invalidate

I am trying to learn rxjava inside Android environment. Let's say I have an observable that emits the result of a network call. If I understood correctly, a widely common approach to deal with config changes is to:

  • store the observable in a retained fragment / singleton / application object

  • apply the cache operator to the observable

  • subscribe / unsubscribe in the proper lifecycle handlers

Doing this, we would not loose the result of the observable which will re-observerd once the new configuration took place.

Now, my question is:

Is there a way to force the observable to emit a new value (and invalidate the cached one)? Do I need to create a new observable every time I want fresh data from the network (which does not sound like a bad practice in android world because would make the gc do extra work)?

Thanks a lot,

Federico

like image 501
fedepaol Avatar asked Jul 30 '15 20:07

fedepaol


1 Answers

Make a custom OnSubscribe implementation that does what you want:

public static class OnSubscribeRefreshingCache<T> implements OnSubscribe<T> {

    private final AtomicBoolean refresh = new AtomicBoolean(true);
    private final Observable<T> source;
    private volatile Observable<T> current;

    public OnSubscribeRefreshingCache(Observable<T> source) {
        this.source = source;
        this.current = source;
    }

    public void reset() {
        refresh.set(true);
    }

    @Override
    public void call(Subscriber<? super T> subscriber) {
        if (refresh.compareAndSet(true, false)) {
            current = source.cache();
        }
        current.unsafeSubscribe(subscriber);
    }

}

This bit of code demonstrates usage and shows that cache is essentially being reset:

Observable<Integer> o = Observable.just(1)
        .doOnCompleted(() -> System.out.println("completed"));
OnSubscribeRefreshingCache<Integer> cacher = 
    new OnSubscribeRefreshingCache<Integer>(o);
Observable<Integer> o2 = Observable.create(cacher);
o2.subscribe(System.out::println);
o2.subscribe(System.out::println);
cacher.reset();
o2.subscribe(System.out::println);

Output:

completed
1
1
completed
1

By the way you may notice that .cache doesn't emit till completion. This is a bug that should be fixed by rxjava 1.0.14.

In terms of your GC pressure concerns, every operator when applied to an Observable creates a new Observable usually via lift or create. The base member state associated with creating a new Observable is the reference to the onSubscribe function. cache is different from most in that it holds state across subscriptions and this holds potential for GC pressure if it holds a lot of state and is thrown away frequently. Even if you used the same mutable data structure to hold the state across resets GC would still have to deal with the contents of the data structure when cleared so you might not gain much.

The RxJava cache operator is built for multiple concurrent subscriptions. You can probably imagine a reset functionality could prove problematic to implement. By all means raise an issue on RxJava github if you want to explore further.

like image 182
Dave Moten Avatar answered Sep 24 '22 01:09

Dave Moten