Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combining timeout() with retryWhen()

I'm creating a simple app for connecting with the bluetooth devices using RxAndroidBle library (Cheers Guys for great work!). What I'm experiencing is sometimes when I connect to the device I receive the Gatt error with status 133. I know it may happen so what I want to do is retry everything when that error occurs. It's not a problem, I can easily do that with retryWhen() operator, however I have another requirement - the stream has to terminate after 30 seconds if the connection wasn't successful. I used timeout() for that, but the problem is that when I do retry, the timer starts again.

So the question is how to combine timeout() operator with retryWhen() so I can retry on some particular error but keep the counter going.

I have some ideas with combining observables, or some separate observable which will check the connection status after the timeout period but I'm wondering if I can do this in a single observable.

My code so far looks like this:

public Observable<ConnectingViewState> connectToDevice(String macAddress) {
        final RxBleDevice rxBleDevice = rxBleClient.getBleDevice(macAddress);
        return rxBleDevice.establishConnection(false)
                .subscribeOn(Schedulers.io())
                .map(rxBleConnection -> new ConnectingViewState.ConnectedViewState(rxBleConnection))
                .cast(ConnectingViewState.class)
                .timeout(40, TimeUnit.SECONDS)
                .startWith(new ConnectingViewState.ConnectingInProgressViewState())
                .retryWhen(errors -> errors.flatMap(error -> {
                            if (isDefaultGattError(error)) {
                                return Observable.just(new Object());
                            } else {
                                return Observable.error(error);
                            }
                        }
                ))
                .onErrorReturn(throwable -> new ConnectingViewState.ErrorState(throwable));
    }
like image 548
Jogosb Avatar asked May 03 '17 17:05

Jogosb


2 Answers

The retryWhen operator works by re-subscribing to the chain of operators above it. Since you placed your timeout before it, said timeout is re-subscribed to and thus starts counting from the beginning again.

Placing the timeout after the retryWhen should apply a global timeout to the whole retriable flow.

like image 162
Simon Baslé Avatar answered Oct 05 '22 10:10

Simon Baslé


As discussed I have written a test with RxJava2. The code was take from the book 'Reactive Programming with RxJava' (page 257)

private final static int ATTEMPTS = 10;

@Test
public void name() throws Exception {
    Subject<Integer> establishConnection = PublishSubject.create();
    TestScheduler testScheduler = new TestScheduler();

    Observable<Integer> timeout = establishConnection.
            retryWhen(failures -> failures
                    .zipWith(Observable.range(1, ATTEMPTS), (err, attempt) ->
                            {
                                // check here for your error if(...)

                                if (attempt < ATTEMPTS) {
                                    long expDelay = (long) Math.pow(2, attempt - 2);
                                    return Observable.timer(expDelay, TimeUnit.SECONDS, testScheduler);
                                } else {
                                    return Observable.error(err);
                                }
                            }
                    )
                    .flatMap(x -> x))
            .timeout(30, TimeUnit.SECONDS, testScheduler)
            .onErrorResumeNext(throwable -> {
                if (throwable instanceof TimeoutException) {
                    return Observable.just(42);
                }
                return Observable.error(throwable);
            });

    TestObserver<Integer> test = timeout.test();

    testScheduler.advanceTimeBy(10, TimeUnit.SECONDS);
    establishConnection.onError(new IOException("Exception 1"));

    testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
    establishConnection.onError(new IOException("Exception 2"));

    testScheduler.advanceTimeBy(31, TimeUnit.SECONDS);

    test.assertValue(42);
}
like image 28
Hans Wurst Avatar answered Oct 05 '22 10:10

Hans Wurst