I have the following rxJava chain:
override fun combineLocationToPlace(req: Flowable<Place>): Flowable<Place> {
var combinedFlowable = Flowable
.combineLatest(
req,
getLastLocation().lastOrError().toFlowable(),
BiFunction<Place, Location, Place> { t1, location ->
Timber.w("FIRSTINIT - Retrieved location $location")
var placeLocation = Location(t1.placeName)
placeLocation.latitude = t1.latitude
placeLocation.longitude = t1.longitude
t1.distance = location.distanceTo(placeLocation)
t1
})
return combinedFlowable
.onErrorResumeNext { t: Throwable ->
Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
req
}
.doOnError {
Timber.w("FIRSTINIT - did detect the error here...")
}
return combinedFlowable
}
In short, I am retrieving some data from the local database (a place) and I want to combine it with the latest location from the GPS:
override fun getLastLocation(requestIfEmpty: Boolean): Observable<Location> {
var lastLocation = locationProvider.lastKnownLocation
.doOnNext {
Timber.w("Got location $it from last one")
}
.doOnComplete {
Timber.w("did i get a location?")
}
if (requestIfEmpty) {
Timber.w("Switching to request of location")
lastLocation = lastLocation.switchIfEmpty(requestLocation())
}
return lastLocation.doOnNext {
Timber.w("Got something!")
location = it
}
}
But I want to account for the scneario where the user does not have a last location, and thus the line:
return combinedFlowable
.onErrorResumeNext { t: Throwable ->
Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
req
}
.doOnError {
Timber.w("FIRSTINIT - did detect the error here...")
}
Which is trying to catch any error and retry just with the original request without combining it with anything. I am calling this code like this:
fun getPlace(placeId: String) {
locationManager.combineLocationToPlace(placesRepository.getPlace(placeId))
.onErrorResumeNext { t: Throwable ->
Timber.e(t, "Error resuming next! ")
placesRepository.getPlace(placeId)
}.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui())
.subscribeBy(
onNext = {
place.value = Result.success(it)
},
onError = {
Timber.e("ERROR! $it")
place.value = Result.failure(it)
}
)
.addTo(disposables)
}
However, when there is no location a NoSuchElementException
is thrown, my flowable switches to the original request, and then upon executing it I get a NetworkOnMainThread
exception. Shouldn't this request be executed on the scheduler.io()
that I put in there (since I put the code before that)?
In case you are wondering, schedulerProvider.io()
translates to:
Schedulers.io()
GetPlace:
/**
* Retrieves a single place from database
*/
override fun getPlace(id: String): Flowable<Place> {
return Flowable.merge(placesDao.getPlace(id),
refreshPlace(id).toFlowable())
}
/**
* Triggers a refreshPlace update on the db, useful when changing stuff associated with the place
* itself indirectly (e.g. an experience)
*/
private fun refreshPlace(id: String): Single<Place> {
return from(placesApi.getPlace(id))
.doOnSuccess {
placesDao.savePlace(it)
}
}
You cant use onErrorResumeNext to implement this funcionality. You should use retryWhen operator.
Maybe this post is usefull for you.
https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a
this code poll a server with backoff until receives a code different from 204. Maybe you can adapt to your needs using retryWhen instead of repeatWhen.
fun pollServerWithBackoff(videoId: String, maxAttempts: Int, delay: Int): Flowable<Response<ResponseBody>> {
return api.download(videoId)
.subscribeOn(Schedulers.io())
.repeatWhen {
it
.zipWith(Flowable.range(1, maxAttempts),
BiFunction { _: Any?, attempt: Int -> attempt })
.flatMap {
Flowable.timer((it * delay).toLong(), TimeUnit.SECONDS);
}
}
.takeUntil({
it.code() != 204
})
.filter {
it.code() != 204
}
.map{
if(it.code() in 200..300)
it
else
throw IOException(it.errorBody()?.toString() ?: "Unkown Error")
}
}
To ensure your networking happens off of the main thread send it to a background thread explicitly.
Use either the IO, New Thread or Computation schedulers from the Rx Schedulers class:
subscribeOn(Schedulers.computation())
For cases when you don't want to do this (or if you believe that it should already be on a background thread and just want to debug), you can log thread information as follows:
Thread.currentThread().getName()
This can be particularly useful for tracking what is going on when you are using either Schedulers.trampoline()
of if you have different schedulers for observe and subscribe like in your example:
.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With