Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava- Caching until emissions go idle for a period?

I am running into a pattern frequently and I'm not quite sure how to get around it effectively.

Basically, if I have an Observable<T> holding an expensive item T, I do not want to re-build that T item every time something uses it, or map it to a thousand different other observables which will result in it being built 1000 times.

So I started using replay() to cache it for a period, but ideally I would like it to clear the cache when the emissions go idle for a period.

Is there an operator or some transformer I can use that accomplishes this?

public final class ActionManager {

    private final Observable<ImmutableList<Action>> actionMap;

    private ActionManager() { 
        this.actionMap = Observable.defer(() -> buildExpensiveList()).replay(10, TimeUnit.SECONDS).autoConnect();
    }

    //this method could get called thousands of times
    //I don't want to rebuild the map for every call

    public Observable<Action> forItem(Item item) { 
        actionMap.map(l -> //get Action for item);
    }


}

UPDATE

Trying to implement this into a Transformer/Operator combo. Is there something I'm doing wrong here?

   public static <T> Transformer<T,T> recacheOnIdle(long time, TimeUnit timeUnit) { 

        return obs -> obs.timeout(time, timeUnit).lift(new Operator<T,T>() {

            private volatile T cachedItem;
            private volatile boolean isCurrent = false;

            @Override
            public Subscriber<? super T> call(Subscriber<? super T> s) {
                return new Subscriber<T>(s) {
                    @Override
                    public void onCompleted() {
                         if(!s.isUnsubscribed()) {
                              s.onCompleted();
                         }
                    }
                    @Override
                    public void onError(Throwable e) {
                        if(!s.isUnsubscribed()) {
                            if (e instanceof TimeoutException) { 
                                isCurrent = false;
                                cachedItem = null;
                            } else { 
                                s.onError(e);
                            }
                        }
                    }

                    @Override
                    public void onNext(T t) {
                        if(!s.isUnsubscribed()) { 
                            if (!isCurrent) { 
                                cachedItem = t;
                            }
                            s.onNext(cachedItem);
                        }
                    } 
                };
            }

        });
    }
like image 484
tmn Avatar asked Sep 27 '22 14:09

tmn


1 Answers

You may be able to use the timeout operator and a connectable observable (to hold and synchronize more than one subscriber):

mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items

Ths way you can respond to the error thrown by re-caching the expensive item. Assuming that's a "rare" case:

// if no emissions are made for a period of 3 seconds - will call onError
observableWithCache.timeout(3000, TimeUnit.MILLISECONDS).subscribe(new Subscriber<SomeObject>() {

    public void onCompleted() {

    }

    public void onError(Throwable arg0) {
        doClearCache(); // make sure to re-subscribe with timeout
    }

    public void onNext(SomeObject item) {
        System.out.println("Got item: " + item); // you can ignore this
    }
});

Note that onError doesn't cancel the original observable, as the diagram shows:

enter image description here

But you can react to a period of time during which no emissions were made.

like image 149
Reut Sharabani Avatar answered Sep 30 '22 09:09

Reut Sharabani