I'm creating a simple app for connecting with the bluetooth devices using RxAndroidBle library (Cheers Guys for great work!).
What I'm experiencing is sometimes when I connect to the device I receive the Gatt error with status 133.
I know it may happen so what I want to do is retry everything when that error occurs.
It's not a problem, I can easily do that with retryWhen()
operator, however I have another requirement - the stream has to terminate after
30 seconds if the connection wasn't successful. I used timeout()
for that, but the problem is that when I do retry, the timer starts again.
So the question is how to combine timeout() operator with retryWhen() so I can retry on some particular error but keep the counter going.
I have some ideas with combining observables, or some separate observable which will check the connection status after the timeout period but I'm wondering if I can do this in a single observable.
My code so far looks like this:
public Observable<ConnectingViewState> connectToDevice(String macAddress) {
final RxBleDevice rxBleDevice = rxBleClient.getBleDevice(macAddress);
return rxBleDevice.establishConnection(false)
.subscribeOn(Schedulers.io())
.map(rxBleConnection -> new ConnectingViewState.ConnectedViewState(rxBleConnection))
.cast(ConnectingViewState.class)
.timeout(40, TimeUnit.SECONDS)
.startWith(new ConnectingViewState.ConnectingInProgressViewState())
.retryWhen(errors -> errors.flatMap(error -> {
if (isDefaultGattError(error)) {
return Observable.just(new Object());
} else {
return Observable.error(error);
}
}
))
.onErrorReturn(throwable -> new ConnectingViewState.ErrorState(throwable));
}
The retryWhen
operator works by re-subscribing to the chain of operators above it. Since you placed your timeout
before it, said timeout is re-subscribed to and thus starts counting from the beginning again.
Placing the timeout
after the retryWhen
should apply a global timeout to the whole retriable flow.
As discussed I have written a test with RxJava2. The code was take from the book 'Reactive Programming with RxJava' (page 257)
private final static int ATTEMPTS = 10;
@Test
public void name() throws Exception {
Subject<Integer> establishConnection = PublishSubject.create();
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timeout = establishConnection.
retryWhen(failures -> failures
.zipWith(Observable.range(1, ATTEMPTS), (err, attempt) ->
{
// check here for your error if(...)
if (attempt < ATTEMPTS) {
long expDelay = (long) Math.pow(2, attempt - 2);
return Observable.timer(expDelay, TimeUnit.SECONDS, testScheduler);
} else {
return Observable.error(err);
}
}
)
.flatMap(x -> x))
.timeout(30, TimeUnit.SECONDS, testScheduler)
.onErrorResumeNext(throwable -> {
if (throwable instanceof TimeoutException) {
return Observable.just(42);
}
return Observable.error(throwable);
});
TestObserver<Integer> test = timeout.test();
testScheduler.advanceTimeBy(10, TimeUnit.SECONDS);
establishConnection.onError(new IOException("Exception 1"));
testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
establishConnection.onError(new IOException("Exception 2"));
testScheduler.advanceTimeBy(31, TimeUnit.SECONDS);
test.assertValue(42);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With