Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to add a timeout to detect that an Observable didn't emit for a while

Tags:

rx-java

My use-case is following:

I created an Observable to access a remote server to fetch some data. However there could be no response and no exception from it forever due to the fact that the server is not well-designed. To work this around I want to have some timeout-with-retry mechanism.

Currently I try to initiate a timer to stop the request and throw the exception inside it and then retry until certain number of attempts or my real timeout. I tried to use mergeWith operator to merge the reqeust with Observable.interval mapped to generate errors using Observable.error(), however I can't get the error being captured in the subscriber and it looks like the Observable.interval never ends.

How should I deal with this situation with any operator in RXJAVA ?

My current code looks like this:

Observable.fromEmitter(fetchNetwork->...)
.mergeWith(Observable.interval(...)
           .flatmap(n->(observable.error)))
.retryWhen(error->(checkTimeExceed))
.subscribe(handleResult)
like image 858
iammini Avatar asked Mar 15 '17 09:03

iammini


1 Answers

You can use timeout() operator together with retryWhen():

Observable.fromEmitter(fetchNetwork->...)
     .timeout(TIMEOUT_VALUE, TimeUnit.SECONDS)
     .retryWhen(observable -> observable.flatMap(error -> {
                if (error instanceof TimeoutException) {
                    return Observable.just(new Object());
                } else {
                    return Observable.error(error);
                }
            }))
     .subscribe(handleResult)

this will timeout the request after TIMEOUT_VALUE seconds, and will retry as long the request was timeout, other errors will be propagate as usual to the subscriber onError().

like image 124
yosriz Avatar answered Jan 03 '23 22:01

yosriz