I have a sample API request which returns a list of user's watchlist. I want to achieve the following flow when the user loads the watchlist screen:
Load the data from DB cache immediately.(cacheWatchList
)
Initiate the RetroFit network call in the background.
i. onSuccess
return apiWatchList
ii. onError
return cacheWatchList
Diff cacheWatchList
vs apiWatchList
i. Same -> all is well since data is already displayed to the user do nothing.
ii. Differs -> Save apiWatchList
to a local store and send the apiWatchList
to the downstream.
What I have done so far?
Watchlist.kt
data class Watchlist(
val items: List<Repository> = emptyList()
)
LocalStore.kt (Android room)
fun saveUserWatchlist(repositories: List<Repository>): Completable {
return Completable.fromCallable {
watchlistDao.saveAllUserWatchlist(*repositories.toTypedArray())
}
}
RemoteStore.kt (Retrofit api call)
fun getWatchlist(userId: UUID): Single<Watchlist?> {
return api.getWatchlist(userId)
}
DataManager.kt
fun getWatchlist(userId: UUID): Flowable<List<Repository>?> {
val localSource: Single<List<Repository>?> =
localStore.getUserWatchlist()
.subscribeOn(scheduler.computation)
val remoteSource: Single<List<Repository>> = remoteStore.getWatchlist(userId)
.map(Watchlist::items)
.doOnSuccess { items: List<Repository> ->
localStore.saveUserWatchlist(items)
.subscribeOn(scheduler.io)
.subscribe()
}
.onErrorResumeNext { throwable ->
if (throwable is IOException) {
return@onErrorResumeNext localStore.getUserWatchlist()
}
return@onErrorResumeNext Single.error(throwable)
}
.subscribeOn(scheduler.io)
return Single.concat(localSource, remoteSource)
}
The problem with the above flow is, it calls onNext
twice for each stream source to the downstream(presenter) even though both the data are same.
I can do the data diff logic in the presenter and update accordingly but I want the DataManager
class to handle the logic for me(CleanArchitecture, SOC).
My Questions?
What's the best possible way to implement the above logic?
Am I leaking the inner subscriptions in DataManager (see: doOnSuccess
code) ?. I'm disposing of the outer subscription when the presenter is destroyed.
fun getWatchlist(userId: UUID): Observable<List<Repository>?> {
val remoteSource: Single<List<Repository>> =
remoteStore.getWatchlist(userId)
.map(Watchlist::items)
.subscribeOn(scheduler.io)
return localStore.getUserWatchlist()
.flatMapObservable { listFromLocal: List<Repository> ->
remoteSource
.observeOn(scheduler.computation)
.toObservable()
.filter { apiWatchList: List<Repository> ->
apiWatchList != listFromLocal
}
.flatMapSingle { apiWatchList ->
localSource.saveUserWatchlist(apiWatchList)
.andThen(Single.just(apiWatchList))
}
.startWith(listFromLocal)
}
}
Explanation step by step:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With