Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Catch error if retryWhen:s retries runs out

In the documentation for RetryWhen the example there goes like this:

Observable.create((Subscriber<? super String> s) -> {
  System.out.println("subscribing");
  s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
  return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
      System.out.println("delay retry by " + i + " second(s)");
      return Observable.timer(i, TimeUnit.SECONDS);
  });
}).toBlocking().forEach(System.out::println);

But how do I propagate the Error if the retries runs out?

Adding .doOnError(System.out::println) after the retryWhen clause does not catch the error. Is it even emitted?

Adding a .doOnError(System.out::println) before retryWhen displays always fails for all retries.

like image 416
Theodor Avatar asked Jun 16 '16 13:06

Theodor


3 Answers

The doc for retryWhen says that it passes onError notification to its subscribers and terminates. So you can do something like this:

    final int ATTEMPTS = 3;

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> attempts
            .zipWith(Observable.range(1, ATTEMPTS), (n, i) ->
                    i < ATTEMPTS ?
                            Observable.timer(i, SECONDS) :
                            Observable.error(n))
            .flatMap(x -> x))
            .toBlocking()
            .forEach(System.out::println);
like image 146
Alexander Perfilyev Avatar answered Nov 09 '22 14:11

Alexander Perfilyev


The Javadoc for retryWhen states that:

If that Observable calls onComplete or onError then retry will call onCompleted or onError on the child subscription.

Put simply, if you want to propagate the exception, you'll need to rethrow the original exception once you've had enough retrying.

An easy way is to set your Observable.range to be 1 greater than the number of times you want to retry.

Then in your zip function test the current number of retries. If it's equal to NUMBER_OF_RETRIES + 1, return Observable.error(throwable) or re-throw your exception.

EG

Observable.create((Subscriber<? super String> s) -> {
            System.out.println("subscribing");
            s.onError(new RuntimeException("always fails"));
        }).retryWhen(attempts -> {
            return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> {
                if (attempt == NUMBER_OF_RETRIES + 1) {
                    throw Throwables.propagate(throwable);
                }
                else {
                    return attempt;
                }
            }).flatMap(i -> {
                System.out.println("delaying retry by " + i + " second(s)");
                return Observable.timer(i, TimeUnit.SECONDS);
            });
        }).toBlocking().forEach(System.out::println);

As an aside doOnError does not affect the Observable in any way - it simply provides you with a hook to perform some action if an error occurs. A common example is logging.

like image 7
Will Avatar answered Nov 09 '22 16:11

Will


One option is using Observable.materialize() to convert Observable.range() items into notifications. Then once onCompleted() is issued, one can propagate error downstream (in sample below Pair is used to wrap Observable.range() notifications and exception from Observable)

   @Test
   public void retryWhen() throws Exception {

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> {
        return attempts.zipWith(Observable.range(1, 3).materialize(), Pair::new)
           .flatMap(notifAndEx -> {
            System.out.println("delay retry by " + notifAndEx + " second(s)");
            return notifAndEx.getRight().isOnCompleted()
                    ? Observable.<Integer>error(notifAndEx.getLeft())
                    : Observable.timer(notifAndEx.getRight().getValue(), TimeUnit.SECONDS);
        });
    }).toBlocking().forEach(System.out::println);
}

    private static class Pair<L,R> {
        private final L left;
        private final R right;

        public Pair(L left, R right) {
            this.left = left;
            this.right = right;
        }

        public L getLeft() {
            return left;
        }

        public R getRight() {
            return right;
        }
    }
like image 1
m.ostroverkhov Avatar answered Nov 09 '22 15:11

m.ostroverkhov