Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava pattern for requesting a remote Observable with a temporary cache

The use case is this: I want to temporarily cache the latest emitted expensive Observable response, but after it expires, return to the expensive source Observable and cache it again, etc.

A pretty basic network cache scenario, but I'm really struggling to get it working.

private Observable<String> getContentObservable() {

    // expensive upstream source (API, etc.)
    Observable<String> sourceObservable = getSourceObservable();

    // cache 1 result for 30 seconds, then return to the source
    return sourceObservable
            .replay(1, 30, TimeUnit.SECONDS)
            .autoConnect()
            .switchIfEmpty(sourceObservable);
}

Initial request: goes to source Second request within 30 seconds of source emitting: delivered from cache Third request outside of cache expiry window: nothing. I subscribe to it and I get no data, but it's not switching to the upstream source Observable.

It looks as if I'm just connecting to my ConnectableObservable from autoConnect() and it's never completing with empty, so it's never triggering my switchIfEmpty().

How can I use this combination of replay(1,x,x) and switchIfEmpty()?

Or am I just approaching this wrong from the start?

like image 935
Neil MacMillan Avatar asked Mar 31 '17 23:03

Neil MacMillan


2 Answers

return sourceObservable
            .replay(1, 30, TimeUnit.SECONDS)
            .autoConnect()
            .switchIfEmpty(sourceObservable);

Initial request: goes to source Second request within 30 seconds of source emitting: delivered from cache Third request outside of cache expiry window: nothing. I subscribe to it and I get no data, but it's not switching to the upstream source Observable.

The problem here is, that replay just repeating the same sequence emitted by the sourceObservable in the last 30 sec, but when you subscribe after 30 sec, the sequence has no events, even no onCompleted(), so you can't switchIfEmpty(), it will not work as it's depends on 'onCompleted()' signal and without any emissions, to know that it's 'empty'.

In general, using replay is not suffice in cache scenario, as what you need is a way to resubscribe again in case the cache is expired, and additionally do it by demand, meaning only when some client subscribe to it. (you can do cache that refresh itself every 30 sec, but that's not the desired behavior I guess)


So, as @Yurly Kulikov suggested, you need to maintain a state, and to control the subscription operation for maintaining the state. But I think there is a major flow in the solution, as it's actually not exatcly thread-safe, meaning if 2 subscribes to it 1 after the another, say A and B, while A executes the request and waits in order to save the new result in the cache, B can subscribe as well, and another request will be executed as cached value didn't set yet by A (it didn't finished yet the first network request.

I suggest to use similar approach with a different implementation, that I suggested here:

public class CachedRequest<T> {

private final AtomicBoolean expired = new AtomicBoolean(true);
private final Observable<T> source;
private final long cacheExpirationInterval;
private final TimeUnit cacheExpirationUnit;
private Observable<T> current;

    public CachedRequest(Observable<T> o, long cacheExpirationInterval,
                         TimeUnit cacheExpirationUnit) {
        source = o;
        current = o;
        this.cacheExpirationInterval = cacheExpirationInterval;
        this.cacheExpirationUnit = cacheExpirationUnit;
   }

    private Observable<T> getCachedObservable() {
        return Observable.defer(() -> {
            if (expired.compareAndSet(true, false)) {
                current = source.cache();
                Observable.timer(cacheExpirationInterval, cacheExpirationUnit)                          
                        .subscribe(aLong -> expired.set(true));
            }
            return current;
        });
    }
}

with defer you can return the right Observable according to cache expiration status, so every subscribe happened within the cache expiration will get cached Observable (using cache()) - meaning request will be performed only once. after cache expiration, additional subscribe will trigger new request and will set a new timer to reset the cache expiration.

like image 125
yosriz Avatar answered Nov 01 '22 19:11

yosriz


So it's turns out you can use Jake Wharton's replaying share to cache the last value even after dispose. https://github.com/JakeWharton/RxReplayingShare

like image 1
Phoenix Wang Avatar answered Nov 01 '22 19:11

Phoenix Wang