Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable, retry on error and cache only if completed

we can use the cache() operator to avoid executing a long task (http request) multiple times, and reuse its result:

Observable apiCall = createApiCallObservable().cache(); // notice the .cache()

---------------------------------------------
// the first time we need it
apiCall.andSomeOtherStuff()
               .subscribe(subscriberA);

---------------------------------------------
//in the future when we need it again
apiCall.andSomeDifferentStuff()
               .subscribe(subscriberB);

The first time, the http request is executed, but the second time, since we used the cache() operator, the request won't be executed but we'll be able to reuse the first result.

This works fine when the first request completes successfully. But if onError is called in the first attempt, then the next time that a new subscriber subscribes to the same observable, the onError will be called again without attempting the http request again.

What we are trying to do is, that if onError is called the first time, then the next time that someone subscribes to the same observable, the http request will be attempted from scratch. ie the observable will cache only the successful api calls, ie those for which the onCompleted was called.

Any ideas about how to proceed? We've tried using the retry() and cache() operators with no much luck.

like image 909
Plato Avatar asked Oct 18 '15 08:10

Plato


3 Answers

Well, for anyone still interested, I think I have a nicer way to achieve it with rx.

The key note is to use onErrorResumeNext, which will let you replace the Observable in case of error. so it should look something like this:

Observable<Object> apiCall = createApiCallObservable().cache(1);
//future call
apiCall.onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() {
    public Observable<? extends Object> call(Throwable throwable) {
        return  createApiCallObservable();
        }
    });

That way, if the first call has failed the future call will just recall it (only once).

but every other caller who will try to use the first observable will failed and make a different request.

you made a reference to the original observable, let's just update it.

so, a lazy getter:

Observable<Object> apiCall;
private Observable<Object> getCachedApiCall() {
    if ( apiCall == null){
        apiCall = createApiCallObservable().cache(1);
    }
    return apiCall;
}

now, a getter that will retry if the previous was failed:

private Observable<Object> getRetryableCachedApiCall() {
    return getCachedApiCall().onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() {
        public Observable<? extends Object> call(Throwable throwable) {
            apiCall = null;
            return getCachedApiCall();
        }
    });
}

Please note that it will only retry once for each time it is called.

So now your code will look something like this:

---------------------------------------------
// the first time we need it - this will be without a retry if you want..
getCachedApiCall().andSomeOtherStuff()
               .subscribe(subscriberA);

---------------------------------------------
//in the future when we need it again - for any other call so we will have a retry
getRetryableCachedApiCall().andSomeDifferentStuff()
               .subscribe(subscriberB);
like image 78
ndori Avatar answered Sep 30 '22 21:09

ndori


This is the solution we ended up with, after extending akarnokd's solution:

public class OnErrorRetryCache<T> {

    public static <T> Observable<T> from(Observable<T> source) {
         return new OnErrorRetryCache<>(source).deferred;
    }

    private final Observable<T> deferred;
    private final Semaphore singlePermit = new Semaphore(1);

    private Observable<T> cache = null;
    private Observable<T> inProgress = null;

    private OnErrorRetryCache(Observable<T> source) {
        deferred = Observable.defer(() -> createWhenObserverSubscribes(source));
    }

    private Observable<T> createWhenObserverSubscribes(Observable<T> source) 
    {
        singlePermit.acquireUninterruptibly();

        Observable<T> cached = cache;
        if (cached != null) {
            singlePermit.release();
            return cached;
        }

        inProgress = source
                .doOnCompleted(this::onSuccess)
                .doOnTerminate(this::onTermination)
                .replay()
                .autoConnect();

        return inProgress;
    }

    private void onSuccess() {
        cache = inProgress;
    }

    private void onTermination() {
        inProgress = null;
        singlePermit.release();
    }
}

We needed to cache the result of an http request from Retrofit. So this was created, with an observable that emits a single item in mind.

If an observer subscribed while the http request was being executed, we wanted it to wait and not execute the request twice, unless the in-progress one failed. To do that the semaphore allows single access to the block that creates or returns the cached observable, and if a new observable is created, we wait until that one terminates. Tests for the above can be found here

like image 43
Plato Avatar answered Sep 30 '22 19:09

Plato


You have to do some state-handling. Here is how I'd do this:

public class CachedRetry {

    public static final class OnErrorRetryCache<T> {
        final AtomicReference<Observable<T>> cached = 
                new AtomicReference<>();

        final Observable<T> result;

        public OnErrorRetryCache(Observable<T> source) {
            result = Observable.defer(() -> {
                for (;;) {
                    Observable<T> conn = cached.get();
                    if (conn != null) {
                        return conn;
                    }
                    Observable<T> next = source
                            .doOnError(e -> cached.set(null))
                            .replay()
                            .autoConnect();

                    if (cached.compareAndSet(null, next)) {
                        return next;
                    }
                }
            });
        }

        public Observable<T> get() {
            return result;
        }
    }

    public static void main(String[] args) {
        AtomicInteger calls = new AtomicInteger();
        Observable<Integer> source = Observable
                .just(1)
                .doOnSubscribe(() -> 
                    System.out.println("Subscriptions: " + (1 + calls.get())))
                .flatMap(v -> {
                    if (calls.getAndIncrement() == 0) {
                        return Observable.error(new RuntimeException());
                    }
                    return Observable.just(42);
                });

        Observable<Integer> o = new OnErrorRetryCache<>(source).get();

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));
    }
}

It works by caching a fully-successful source and returns it to everyone. Otherwise, a (partially) failed source will crear the cache and the next call observer will trigger a resubscription.

like image 24
akarnokd Avatar answered Sep 30 '22 21:09

akarnokd