In the documentation for RetryWhen the example there goes like this:
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).toBlocking().forEach(System.out::println);
But how do I propagate the Error if the retries runs out?
Adding .doOnError(System.out::println)
after the retryWhen
clause does not catch the error. Is it even emitted?
Adding a .doOnError(System.out::println)
before retryWhen displays always fails
for all retries.
The doc for retryWhen
says that it passes onError
notification to its subscribers and terminates. So you can do something like this:
final int ATTEMPTS = 3;
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> attempts
.zipWith(Observable.range(1, ATTEMPTS), (n, i) ->
i < ATTEMPTS ?
Observable.timer(i, SECONDS) :
Observable.error(n))
.flatMap(x -> x))
.toBlocking()
.forEach(System.out::println);
The Javadoc for retryWhen
states that:
If that Observable calls onComplete or onError then retry will call onCompleted or onError on the child subscription.
Put simply, if you want to propagate the exception, you'll need to rethrow the original exception once you've had enough retrying.
An easy way is to set your Observable.range
to be 1 greater than the number of times you want to retry.
Then in your zip function test the current number of retries. If it's equal to NUMBER_OF_RETRIES + 1, return Observable.error(throwable)
or re-throw your exception.
EG
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> {
if (attempt == NUMBER_OF_RETRIES + 1) {
throw Throwables.propagate(throwable);
}
else {
return attempt;
}
}).flatMap(i -> {
System.out.println("delaying retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).toBlocking().forEach(System.out::println);
As an aside doOnError
does not affect the Observable in any way - it simply provides you with a hook to perform some action if an error occurs. A common example is logging.
One option is using Observable.materialize()
to convert Observable.range()
items into notifications. Then once onCompleted()
is issued, one can propagate error downstream (in sample below Pair
is used to wrap Observable.range()
notifications and exception from Observable
)
@Test
public void retryWhen() throws Exception {
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3).materialize(), Pair::new)
.flatMap(notifAndEx -> {
System.out.println("delay retry by " + notifAndEx + " second(s)");
return notifAndEx.getRight().isOnCompleted()
? Observable.<Integer>error(notifAndEx.getLeft())
: Observable.timer(notifAndEx.getRight().getValue(), TimeUnit.SECONDS);
});
}).toBlocking().forEach(System.out::println);
}
private static class Pair<L,R> {
private final L left;
private final R right;
public Pair(L left, R right) {
this.left = left;
this.right = right;
}
public L getLeft() {
return left;
}
public R getRight() {
return right;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With