I'm playing with the RxJava retryWhen operator. Very little is found about it on the internet, the only one worthy of any mention being this. That too falls short of exploring the various use cases that I'd like to understand. I also threw in asynchronous execution and retry with back-off to make it more realistic.
My setup is simple: I've a class ChuckNorrisJokesRepository
that returns random number of Chuck Norris jokes from a JSON file. My class under test is ChuckNorrisJokesService
which is shown below. The use cases I'm interested in are as follows:
Note: The project is available on my GitHub.
ChuckNorrisJokesService.java:
@Slf4j
@Builder
public class ChuckNorrisJokesService {
@Getter
private final AtomicReference<Jokes> jokes = new AtomicReference<>(new Jokes());
private final Scheduler scheduler;
private final ChuckNorrisJokesRepository jokesRepository;
private final CountDownLatch latch;
private final int numRetries;
private final Map<String, List<String>> threads;
public static class ChuckNorrisJokesServiceBuilder {
public ChuckNorrisJokesService build() {
if (scheduler == null) {
scheduler = Schedulers.io();
}
if (jokesRepository == null) {
jokesRepository = new ChuckNorrisJokesRepository();
}
if (threads == null) {
threads = new ConcurrentHashMap<>();
}
requireNonNull(latch, "CountDownLatch must not be null.");
return new ChuckNorrisJokesService(scheduler, jokesRepository, latch, numRetries, threads);
}
}
public void setRandomJokes(int numJokes) {
mergeThreadNames("getRandomJokes");
Observable.fromCallable(() -> {
log.debug("fromCallable - before call. Latch: {}.", latch.getCount());
mergeThreadNames("fromCallable");
latch.countDown();
List<Joke> randomJokes = jokesRepository.getRandomJokes(numJokes);
log.debug("fromCallable - after call. Latch: {}.", latch.getCount());
return randomJokes;
}).retryWhen(errors ->
errors.zipWith(Observable.range(1, numRetries), (n, i) -> i).flatMap(retryCount -> {
log.debug("retryWhen. retryCount: {}.", retryCount);
mergeThreadNames("retryWhen");
return Observable.timer(retryCount, TimeUnit.SECONDS);
}))
.subscribeOn(scheduler)
.subscribe(j -> {
log.debug("onNext. Latch: {}.", latch.getCount());
mergeThreadNames("onNext");
jokes.set(new Jokes("success", j));
latch.countDown();
},
ex -> {
log.error("onError. Latch: {}.", latch.getCount(), ex);
mergeThreadNames("onError");
},
() -> {
log.debug("onCompleted. Latch: {}.", latch.getCount());
mergeThreadNames("onCompleted");
latch.countDown();
}
);
}
private void mergeThreadNames(String methodName) {
threads.merge(methodName,
new ArrayList<>(Arrays.asList(Thread.currentThread().getName())),
(value, newValue) -> {
value.addAll(newValue);
return value;
});
}
}
For brevity, I'll only show the Spock test case for the 1st use case. See my GitHub for the other test cases.
def "succeeds on 1st attempt"() {
setup:
CountDownLatch latch = new CountDownLatch(2)
Map<String, List<String>> threads = Mock(Map)
ChuckNorrisJokesService service = ChuckNorrisJokesService.builder()
.latch(latch)
.threads(threads)
.build()
when:
service.setRandomJokes(3)
latch.await(2, TimeUnit.SECONDS)
Jokes jokes = service.jokes.get()
then:
jokes.status == 'success'
jokes.count() == 3
1 * threads.merge('getRandomJokes', *_)
1 * threads.merge('fromCallable', *_)
0 * threads.merge('retryWhen', *_)
1 * threads.merge('onNext', *_)
0 * threads.merge('onError', *_)
1 * threads.merge('onCompleted', *_)
}
This fails with:
Too few invocations for:
1 * threads.merge('fromCallable', *_) (0 invocations)
1 * threads.merge('onNext', *_) (0 invocations)
What I'm expecting is that fromCallable
is called once, it succeeds, onNext
is called once, followed by onCompleted
. What am I missing?
P.S.: Full disclosure - I've also posted this question on RxJava GitHub.
I solved this after several hours of troubleshooting and with help from ReactiveX member David Karnok.
retryWhen
is a complicated, perhaps even buggy, operator. The official doc and at least one answer here use range
operator, which completes immediately if there are no retries to be made. See my discussion with David Karnok.
The code is available on my GitHub complete with the following test cases:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With