I have a getPlaces
method in my repository:
override fun getPlaces(filter: FilterRequest): Flowable<List<Place>> {
return from(placesApi.filter(filter))
.doOnSuccess {
placesDao.savePlaces(it)
}
.flatMapPublisher { it ->
placesDao.getPlaces(it.map { it.placeId })
}
}
This method gathers the result from an api, then saves the results in the database and returns a flowable with those places retrieved by id from the database as Flowable
:
@Query("select * from Places where placeId in (:placesIds)")
fun getPlaces(placesIds: List<String>) : Flowable<List<Place>>
Now everytime i change one of this objects I can see the change all throughout my app.
Now I want to combine these results with the distance from current location, like so:
override fun addDistanceToPlaces(req: Flowable<List<Place>>): Flowable<List<Place>> {
return req
.zipWith(getLastLocation().toFlowable(BackpressureStrategy.LATEST),
BiFunction<List<Place>, Location, List<Place>> { places, location ->
places.forEach {
var placeLocation = Location(it.placeName)
placeLocation.latitude = it.latitude
placeLocation.longitude = it.longitude
it.distance = location.distanceTo(placeLocation)
}
places.sortedBy {
it.distance
}
})
.onErrorResumeNext { t: Throwable ->
req
}
}
This works, however if I apply this, I lose "updates" from Room; changes are not notified to observers so I have to do a manual refresh.
Why is this happening? Shouldnt zip
just be combining emissions from both sources?
Your problem is trying to use the zip
operator for your use case. Zip emits by pairing the input observable's values. It doesn't emit for every change to a single one of your observables, but rather when both of them emit. Check out the marble for it to help you visualize its behavior:
http://reactivex.io/documentation/operators/zip.html
So in your case the Room Observable is emitting into your zip function but the location Observable isn't updating so therefor you don't get your function called.
I think you are looking for the combineLatest
operator. This will wait till both the Room Observable and Location Observable vend once, and then after that either observable can emit and your combine function will be invoked and subsequent value emitted to your app.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With