I am using RxJava/Kotlin with Room and Retrofit. I am sure I'm not making something write as I just started learning RxJava. The scenario is that I make a call to check if there are favorite records in DB and get them in a List, fetch data from API and insert it in DB, update the DB with the previous favorites list and get all records as an, now updated, List. I get the result in my fragment but each time I get it it's as if I get 1 less favorite item until I get no favorite items.
Repository
fun getKafaniFromApi(): Observable<List<Kafana>> {
return apiService.getKafani().toObservable().doOnNext {
insertKafaniInDb(it)
}
}
fun getKafaniFromDb(): Observable<List<Kafana>> {
return kafanaDao.getKafani().toObservable()
}
fun insertKafaniInDb(kafani: List<Kafana>) {
Observable.fromCallable { kafanaDao.insertAll(kafani) }
.subscribeOn(Schedulers.io())
.subscribe {
Timber.d("Inserted ${kafani.size} kafani from API in DB...")
}
}
fun getFavoriteKafani(): Single<List<Kafana>> {
return kafanaDao.getFavoriteKafani()
}
fun setKafanaFavorite(kafana: Kafana, isFavorite: Int) {
return kafanaDao.setFavourite(kafana.name, isFavorite)
}
fun updateFavoriteKafana(kafana: Kafana) {
return kafanaDao.updateFavoriteKafana(kafana)
}
And in my viewmodel
fun get(): Observable<List<Kafana>> {
return kafanaRepository.getFavoriteKafani()
.toObservable()
.doOnNext { kafaniList = it }
.flatMap { kafanaRepository.getKafaniFromApi() }
.doOnNext { kafaniList?.forEach { kafanaRepository.updateFavoriteKafana(it) } }
.flatMap { kafanaRepository.getKafaniFromDb() }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}
I will actually get the list in my fragment but, as I said, it will always be one less favorite until there are none.
First of all try not rely on Side effects, that make things unpredictable .. for example this function
fun insertKafaniInDb(kafani: List<Kafana>) {
Observable.fromCallable { kafanaDao.insertAll(kafani) }
.subscribeOn(Schedulers.io())
.subscribe {
Timber.d("Inserted ${kafani.size} kafani from API in DB...")
}
}
its return type is Unit, it's better to include it in the stream, this is done by converting to Completable
so it will be something like this
fun insertKafaniInDb(kafani: List<Kafana>) {
return Observable.fromAction { kafanaDao.insertAll(kafani) }
.subscribeOn(Schedulers.io())
.doOnComplete { Timber.d("Inserted ${kafani.size} kafani from API in DB...") }
}
other functions that return Unit (void in java) should be converted the same way to completabel. so now I'll try to rewrite your logic without using side effects. and explaining each step.
fun getUpdatedData(): Single<MutableList<String>>? {
return kafanaRepository.getFavoriteKafani()
.toObservable()
.flatMap { Observable.fromIterable(it) } //to iterate on favorite items
.flatMap { localItem ->
kafanaRepository.getKafaniFromApi()
.flatMap { Observable.fromIterable(it) } //to iterate api items
.filter { localItem == it } //search for the favorite item in Api response
.flatMap {
//we update this item then we pass it after update
kafanaRepository.updateFavoriteKafana(it)
.andThen(Observable.just(it))
}
.defaultIfEmpty(localItem) //if it's not found, then no update needed we take this it.
}.toList() // we collect the updated and non updated local items to list
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}
Hope this could help you.
Final words .. Rx is about ordering your thoughts and plug your logic in functional way .. try to avoid using onNext()
to update global variables, use it just for logging and non Business logic.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With