Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Zipping Room Flowable prevents updates

I have a getPlaces method in my repository:

override fun getPlaces(filter: FilterRequest): Flowable<List<Place>> {
    return from(placesApi.filter(filter))
            .doOnSuccess {
                placesDao.savePlaces(it)
            }
            .flatMapPublisher { it ->
                placesDao.getPlaces(it.map { it.placeId })
            }
}

This method gathers the result from an api, then saves the results in the database and returns a flowable with those places retrieved by id from the database as Flowable:

@Query("select * from Places where placeId in (:placesIds)")
fun getPlaces(placesIds: List<String>) : Flowable<List<Place>>

Now everytime i change one of this objects I can see the change all throughout my app.

Now I want to combine these results with the distance from current location, like so:

 override fun addDistanceToPlaces(req: Flowable<List<Place>>): Flowable<List<Place>> {
        return req
                .zipWith(getLastLocation().toFlowable(BackpressureStrategy.LATEST),
                        BiFunction<List<Place>, Location, List<Place>> { places, location ->
                            places.forEach {
                                var placeLocation = Location(it.placeName)
                                placeLocation.latitude = it.latitude
                                placeLocation.longitude = it.longitude

                                it.distance = location.distanceTo(placeLocation)
                            }
                            places.sortedBy {
                                it.distance
                            }
                        })
                .onErrorResumeNext { t: Throwable ->
                    req
                }

    }

This works, however if I apply this, I lose "updates" from Room; changes are not notified to observers so I have to do a manual refresh.

Why is this happening? Shouldnt zip just be combining emissions from both sources?

like image 966
Gabriel Sanmartin Avatar asked Jun 12 '18 13:06

Gabriel Sanmartin


1 Answers

Your problem is trying to use the zip operator for your use case. Zip emits by pairing the input observable's values. It doesn't emit for every change to a single one of your observables, but rather when both of them emit. Check out the marble for it to help you visualize its behavior:

http://reactivex.io/documentation/operators/zip.html

So in your case the Room Observable is emitting into your zip function but the location Observable isn't updating so therefor you don't get your function called.

I think you are looking for the combineLatest operator. This will wait till both the Room Observable and Location Observable vend once, and then after that either observable can emit and your combine function will be invoked and subsequent value emitted to your app.

like image 70
Lance Johnson Avatar answered Oct 20 '22 08:10

Lance Johnson