Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error with Flowable's onErrorResumeNext, networkOnMainThread

I have the following rxJava chain:

 override fun combineLocationToPlace(req: Flowable<Place>): Flowable<Place> {
        var combinedFlowable = Flowable
                .combineLatest(
                        req,
                        getLastLocation().lastOrError().toFlowable(),
                        BiFunction<Place, Location, Place> { t1, location ->
                            Timber.w("FIRSTINIT - Retrieved location $location")
                            var placeLocation = Location(t1.placeName)
                            placeLocation.latitude = t1.latitude
                            placeLocation.longitude = t1.longitude
                            t1.distance = location.distanceTo(placeLocation)
                            t1
                        })


        return combinedFlowable
                .onErrorResumeNext { t: Throwable ->
                    Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
                    req
                }
                .doOnError {
                    Timber.w("FIRSTINIT - did detect the error here...")
                }

        return combinedFlowable
    }

In short, I am retrieving some data from the local database (a place) and I want to combine it with the latest location from the GPS:

 override fun getLastLocation(requestIfEmpty: Boolean): Observable<Location> {
        var lastLocation = locationProvider.lastKnownLocation
                .doOnNext {
                    Timber.w("Got location $it from last one")
                }
                .doOnComplete {
                    Timber.w("did i get a location?")
                }

        if (requestIfEmpty) {
            Timber.w("Switching to request of location")
            lastLocation = lastLocation.switchIfEmpty(requestLocation())
        }

        return lastLocation.doOnNext {
            Timber.w("Got something!")
            location = it
        }


    }

But I want to account for the scneario where the user does not have a last location, and thus the line:

return combinedFlowable
                    .onErrorResumeNext { t: Throwable ->
                        Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
                        req
                    }
                    .doOnError {
                        Timber.w("FIRSTINIT - did detect the error here...")
                    }

Which is trying to catch any error and retry just with the original request without combining it with anything. I am calling this code like this:

fun getPlace(placeId: String) {
        locationManager.combineLocationToPlace(placesRepository.getPlace(placeId))
                .onErrorResumeNext { t: Throwable ->
                    Timber.e(t, "Error resuming next! ")
                    placesRepository.getPlace(placeId)
                }.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui())
                .subscribeBy(
                        onNext = {
                            place.value = Result.success(it)
                        },
                        onError = {
                            Timber.e("ERROR! $it")
                            place.value = Result.failure(it)
                        }
                )
                .addTo(disposables)

    }

However, when there is no location a NoSuchElementException is thrown, my flowable switches to the original request, and then upon executing it I get a NetworkOnMainThread exception. Shouldn't this request be executed on the scheduler.io() that I put in there (since I put the code before that)?

In case you are wondering, schedulerProvider.io() translates to:

Schedulers.io()

GetPlace:

  /**
     * Retrieves a single place from database
     */
      override fun getPlace(id: String): Flowable<Place> {
        return Flowable.merge(placesDao.getPlace(id),
                refreshPlace(id).toFlowable())
    }

    /**
     * Triggers a refreshPlace update on the db, useful when changing stuff associated with the place
     * itself indirectly (e.g. an experience)
     */
    private fun refreshPlace(id: String): Single<Place> {
        return from(placesApi.getPlace(id))
                .doOnSuccess {
                    placesDao.savePlace(it)
                }
    }
like image 238
MichelReap Avatar asked Jul 19 '18 11:07

MichelReap


2 Answers

You cant use onErrorResumeNext to implement this funcionality. You should use retryWhen operator.

Maybe this post is usefull for you.

https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a

this code poll a server with backoff until receives a code different from 204. Maybe you can adapt to your needs using retryWhen instead of repeatWhen.

  fun pollServerWithBackoff(videoId: String, maxAttempts: Int, delay: Int): Flowable<Response<ResponseBody>> {
        return api.download(videoId)
                .subscribeOn(Schedulers.io())
                .repeatWhen {
                    it
                            .zipWith(Flowable.range(1, maxAttempts),
                                    BiFunction { _: Any?, attempt: Int -> attempt })
                            .flatMap {

                                Flowable.timer((it * delay).toLong(), TimeUnit.SECONDS);
                            }

                }
                .takeUntil({

                    it.code() != 204
                })
                .filter {

                    it.code() != 204
                }
                .map{
                    if(it.code() in 200..300)
                        it
                    else
                        throw IOException(it.errorBody()?.toString() ?: "Unkown Error")
                }
    }
like image 120
Roger Garzon Nieto Avatar answered Nov 08 '22 20:11

Roger Garzon Nieto


To ensure your networking happens off of the main thread send it to a background thread explicitly.

Use either the IO, New Thread or Computation schedulers from the Rx Schedulers class:

subscribeOn(Schedulers.computation())

For cases when you don't want to do this (or if you believe that it should already be on a background thread and just want to debug), you can log thread information as follows:

Thread.currentThread().getName()

This can be particularly useful for tracking what is going on when you are using either Schedulers.trampoline() of if you have different schedulers for observe and subscribe like in your example:

.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui())
like image 4
Nick Cardoso Avatar answered Nov 08 '22 20:11

Nick Cardoso