The use case is this: I want to temporarily cache the latest emitted expensive Observable response, but after it expires, return to the expensive source Observable and cache it again, etc.
A pretty basic network cache scenario, but I'm really struggling to get it working.
private Observable<String> getContentObservable() {
// expensive upstream source (API, etc.)
Observable<String> sourceObservable = getSourceObservable();
// cache 1 result for 30 seconds, then return to the source
return sourceObservable
.replay(1, 30, TimeUnit.SECONDS)
.autoConnect()
.switchIfEmpty(sourceObservable);
}
Initial request: goes to source Second request within 30 seconds of source emitting: delivered from cache Third request outside of cache expiry window: nothing. I subscribe to it and I get no data, but it's not switching to the upstream source Observable.
It looks as if I'm just connecting to my ConnectableObservable from autoConnect()
and it's never completing with empty, so it's never triggering my switchIfEmpty()
.
How can I use this combination of replay(1,x,x)
and switchIfEmpty()
?
Or am I just approaching this wrong from the start?
return sourceObservable .replay(1, 30, TimeUnit.SECONDS) .autoConnect() .switchIfEmpty(sourceObservable);
Initial request: goes to source Second request within 30 seconds of source emitting: delivered from cache Third request outside of cache expiry window: nothing. I subscribe to it and I get no data, but it's not switching to the upstream source Observable.
The problem here is, that replay just repeating the same sequence emitted by the sourceObservable
in the last 30 sec, but when you subscribe after 30 sec, the sequence has no events, even no onCompleted()
, so you can't switchIfEmpty()
, it will not work as it's depends on 'onCompleted()'
signal and without any emissions, to know that it's 'empty'.
In general, using replay is not suffice in cache scenario, as what you need is a way to resubscribe again in case the cache is expired, and additionally do it by demand, meaning only when some client subscribe to it. (you can do cache that refresh itself every 30 sec, but that's not the desired behavior I guess)
So, as @Yurly Kulikov suggested, you need to maintain a state, and to control the subscription operation for maintaining the state. But I think there is a major flow in the solution, as it's actually not exatcly thread-safe, meaning if 2 subscribes to it 1 after the another, say A and B, while A executes the request and waits in order to save the new result in the cache, B can subscribe as well, and another request will be executed as cached value didn't set yet by A (it didn't finished yet the first network request.
I suggest to use similar approach with a different implementation, that I suggested here:
public class CachedRequest<T> {
private final AtomicBoolean expired = new AtomicBoolean(true);
private final Observable<T> source;
private final long cacheExpirationInterval;
private final TimeUnit cacheExpirationUnit;
private Observable<T> current;
public CachedRequest(Observable<T> o, long cacheExpirationInterval,
TimeUnit cacheExpirationUnit) {
source = o;
current = o;
this.cacheExpirationInterval = cacheExpirationInterval;
this.cacheExpirationUnit = cacheExpirationUnit;
}
private Observable<T> getCachedObservable() {
return Observable.defer(() -> {
if (expired.compareAndSet(true, false)) {
current = source.cache();
Observable.timer(cacheExpirationInterval, cacheExpirationUnit)
.subscribe(aLong -> expired.set(true));
}
return current;
});
}
}
with defer you can return the right Observable according to cache expiration status, so every subscribe happened within the cache expiration will get cached Observable (using cache()) - meaning request will be performed only once. after cache expiration, additional subscribe will trigger new request and will set a new timer to reset the cache expiration.
So it's turns out you can use Jake Wharton's replaying share to cache the last value even after dispose. https://github.com/JakeWharton/RxReplayingShare
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With