I am implementing repository pattern in RxJava using SqlBrite/SqlDelight for offline data storage and retrofit for Http requests
Here's a sample of that:
protected Observable<List<Item>> getItemsFromDb() {
return database.createQuery(tableName(), selectAllStatement())
.mapToList(cursor -> selectAllMapper().map(cursor));
}
public Observable<List<Item>>getItems(){
Observable<List<Item>> server = getRequest()
.doOnNext(items -> {
BriteDatabase.Transaction transaction = database.newTransaction();
for (Item item : items){
database.insert(tableName(), contentValues(item));
}
transaction.markSuccessful();
transaction.end();
})
.flatMap(items -> getItemsFromDbById())
.delaySubscription(200, TimeUnit.MILLISECONDS);
Observable<List<Item>> db = getItemsFromDbById(id)
.filter(items -> items != null && items.size() > 0);
return Observable.amb(db, server).doOnSubscribe(() -> server.subscribe(items -> {}, throwable -> {}));
}
The current implementation uses Observable.amb
to get latest of 2 streams and returns db
stream in case db
has data or server otherwise. To prevent early failure in case of no internet, server
has a delaySubscription
on it with 200ms
.
I tried using Observable.concat
but the SqlBrite stream never calls onComplete
so server
observable is never triggered.
I also tried Observable.combineLatest
which didn't work because it keeps waiting for server
observable to return data before emitting anything and Observable.switchOnNext
didn't work either.
What I am looking for is a repository which:
This is how you can solve the problem above, i.e., fetching data from 2 sources (local and remote) and send an update to UI only when required.
The data class wraps your data and also stores the source of data
class Data<T> {
static final int STATE_LOCAL = 0;
static final int STATE_SERVER = 1;
private T data;
private int state;
Data(T data, int state) {
this.data = data;
this.state = state;
}
public int getState() { return state; }
public T getData() { return data; }
}
...
public Observable<Model> getData(long id) {
// Used to cache data and compare it with server data, so we can avoid unnecessary UI updates
Subject<Data<Model>> publishSubject = BehaviorSubject.create();
publishSubject.onNext(new Data<>(null, Data.STATE_LOCAL));
Observable<Data<Model>> server = getRequest()
.map(items -> new Data<>(items, Data.STATE_SERVER))
// Here we are combining data from server and our `BehaviorSubject`
// If any one has ideas how to do this without the subject, I'll be glad to hear it.
.flatMap(items -> Observable.zip(publishSubject.take(1), Observable.just(items), Pair::new))
.flatMap(oldNewPair -> {
// Here we are comparing old and new data to see if there was any new data returned from server
Data<Model> prevData = oldNewPair.first;
Data<Model> newData = oldNewPair.second;
//Could be any condition to compare the old and new data
if (prevData.data != null && prevData.data.updated_at() == newData.data.updated_at())
return Observable.just(prevData);
else
return database.insert(tableName(), contentValues(newData));
return getFromDb(id)
.map(item -> new Data<>(item, Data.STATE_LOCAL))
.onErrorResumeNext(server)
.doOnNext(item -> {
publishSubject.onNext(item);
if (item.getState() == Data.STATE_LOCAL)
server.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe();
})
.map(item -> item.data);
}
This solution is without using amb
and uses BehaviorSubject which solves the following problem:
No use of delaySubscription
(Earlier used to prevent early failure in case of no internet.)
Earlier, each time two calls were made to the server which is solved in this case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With