I am running into a pattern frequently and I'm not quite sure how to get around it effectively.
Basically, if I have an Observable<T>
holding an expensive item T
, I do not want to re-build that T
item every time something uses it, or map it to a thousand different other observables which will result in it being built 1000 times.
So I started using replay()
to cache it for a period, but ideally I would like it to clear the cache when the emissions go idle for a period.
Is there an operator or some transformer I can use that accomplishes this?
public final class ActionManager {
private final Observable<ImmutableList<Action>> actionMap;
private ActionManager() {
this.actionMap = Observable.defer(() -> buildExpensiveList()).replay(10, TimeUnit.SECONDS).autoConnect();
}
//this method could get called thousands of times
//I don't want to rebuild the map for every call
public Observable<Action> forItem(Item item) {
actionMap.map(l -> //get Action for item);
}
}
UPDATE
Trying to implement this into a Transformer/Operator combo. Is there something I'm doing wrong here?
public static <T> Transformer<T,T> recacheOnIdle(long time, TimeUnit timeUnit) {
return obs -> obs.timeout(time, timeUnit).lift(new Operator<T,T>() {
private volatile T cachedItem;
private volatile boolean isCurrent = false;
@Override
public Subscriber<? super T> call(Subscriber<? super T> s) {
return new Subscriber<T>(s) {
@Override
public void onCompleted() {
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if(!s.isUnsubscribed()) {
if (e instanceof TimeoutException) {
isCurrent = false;
cachedItem = null;
} else {
s.onError(e);
}
}
}
@Override
public void onNext(T t) {
if(!s.isUnsubscribed()) {
if (!isCurrent) {
cachedItem = t;
}
s.onNext(cachedItem);
}
}
};
}
});
}
You may be able to use the timeout operator and a connectable observable (to hold and synchronize more than one subscriber):
mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items
Ths way you can respond to the error thrown by re-caching the expensive item. Assuming that's a "rare" case:
// if no emissions are made for a period of 3 seconds - will call onError
observableWithCache.timeout(3000, TimeUnit.MILLISECONDS).subscribe(new Subscriber<SomeObject>() {
public void onCompleted() {
}
public void onError(Throwable arg0) {
doClearCache(); // make sure to re-subscribe with timeout
}
public void onNext(SomeObject item) {
System.out.println("Got item: " + item); // you can ignore this
}
});
Note that onError
doesn't cancel the original observable, as the diagram shows:
But you can react to a period of time during which no emissions were made.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With