Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

rxjava: Can I use retry() but with delay?

Tags:

java

rx-java

I am using rxjava in my Android app to handle network requests asynchronously. Now I would like to retry a failed network request only after a certain time has passed.

Is there any way to use retry() on an Observable but to retry only after a certain delay?

Is there a way to let the Observable know that is is currently being retried (as opposed to tried for the first time)?

I had a look at debounce()/throttleWithTimeout() but they seem to be doing something different.

Edit:

I think I found one way to do it, but I'd be interested in either confirmation that this is the correct way to do it or for other, better ways.

What I am doing is this: In the call() method of my Observable.OnSubscribe, before I call the Subscribers onError() method, I simply let the Thread sleep for the desired amount of time. So, to retry every 1000 milliseconds, I do something like this:

@Override public void call(Subscriber<? super List<ProductNode>> subscriber) {     try {         Log.d(TAG, "trying to load all products with pid: " + pid);         subscriber.onNext(productClient.getProductNodesForParentId(pid));         subscriber.onCompleted();     } catch (Exception e) {         try {             Thread.sleep(1000);         } catch (InterruptedException e1) {             e.printStackTrace();         }         subscriber.onError(e);     } } 

Since this method is running on an IO thread anyway it does not block the UI. The only problem I can see is that even the first error is reported with delay so the delay is there even if there's no retry(). I'd like it better if the delay wasn't applied after an error but instead before a retry (but not before the first try, obviously).

like image 708
david.mihola Avatar asked Feb 27 '14 11:02

david.mihola


People also ask

How does retryWhen work?

retryWhen operator works in the following way: Subscribe to a source observable. When a new value arrives from a source observable, send the value to the observer. If the source observable throws an error, execute a callback function and subscribe to the returned guiding observable.

What happens when an error occurs using RxJava?

RxJava Error Handling That means that after error happened stream is basically finished and no more events can come through it. If Consumer didn't handle error in Observer callback, then that error is sent to a global error handler (which in case of Android crashes the app by default).

Is RxJava deprecated?

RxJava, once the hottest framework in Android development, is dying. It's dying quietly, without drawing much attention to itself. RxJava's former fans and advocates moved on to new shiny things, so there is no one left to say a proper eulogy over this, once very popular, framework.

Is RxJava asynchronous?

RxJava is a specific implementation of reactive programming for Java and Android that is influenced by functional programming. It favors function composition, avoidance of global state and side effects, and thinking in streams to compose asynchronous and event-based programs.


1 Answers

You can use the retryWhen() operator to add retry logic to any Observable.

The following class contains the retry logic:

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {     private final int maxRetries;     private final int retryDelayMillis;     private int retryCount;      public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {         this.maxRetries = maxRetries;         this.retryDelayMillis = retryDelayMillis;         this.retryCount = 0;     }      @Override     public Observable<?> apply(final Observable<? extends Throwable> attempts) {         return attempts                 .flatMap(new Function<Throwable, Observable<?>>() {                     @Override                     public Observable<?> apply(final Throwable throwable) {                         if (++retryCount < maxRetries) {                             // When this Observable calls onNext, the original                             // Observable will be retried (i.e. re-subscribed).                             return Observable.timer(retryDelayMillis,                                     TimeUnit.MILLISECONDS);                         }                          // Max retries hit. Just pass the error along.                         return Observable.error(throwable);                     }                 });     } } 

RxJava 1.x

public class RetryWithDelay implements         Func1<Observable<? extends Throwable>, Observable<?>> {      private final int maxRetries;     private final int retryDelayMillis;     private int retryCount;      public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {         this.maxRetries = maxRetries;         this.retryDelayMillis = retryDelayMillis;         this.retryCount = 0;     }      @Override     public Observable<?> call(Observable<? extends Throwable> attempts) {         return attempts                 .flatMap(new Func1<Throwable, Observable<?>>() {                     @Override                     public Observable<?> call(Throwable throwable) {                         if (++retryCount < maxRetries) {                             // When this Observable calls onNext, the original                             // Observable will be retried (i.e. re-subscribed).                             return Observable.timer(retryDelayMillis,                                     TimeUnit.MILLISECONDS);                         }                          // Max retries hit. Just pass the error along.                         return Observable.error(throwable);                     }                 });     } } 

Usage:

// Add retry logic to existing observable. // Retry max of 3 times with a delay of 2 seconds. observable     .retryWhen(new RetryWithDelay(3, 2000)); 
like image 93
kjones Avatar answered Oct 09 '22 23:10

kjones