I want to use RxJava to load data from a web service (via Retrofit). I also have a database cache of previous results.
Let assume I already have observables for each of these:
Observable<List<MyModel>> networkObservable = retrofitService.getModels();
Observable<List<MyModel>> dbObservable = database.getModels();
I want to merge these 2 observables into one:
public class MyModelHelper {
public static Observable<List<MyModel>> getModels() {
// TODO: Help!
}
}
The behaviour I want is for subscribers to receive the database results as soon as available, followed by the restService results when they come in (assuming fetching from the db is faster than making the network call)
The best I could come up with on my own is:
public class MyModelHelper {
public static Observable<List<MyModel>> getModels() {
List<MyModel> emptyList = new LinkedList<>();
// 'startWith' because combineLatest wont call back until all source observables emit something
Observable.combineLatest(dbObservable.startWith(emptyList),
networkObservable.startWith(emptyList),
new Func2<List<MyModel>, List<MyModel>, List<MyModel>>() {
@Override
public List<MyModel> call(List<MyModel> first, List<MyModel> second) {
return merge(first, second);
}
});
}
}
This seems a little hacky to me and I feel for such a common scenario there must be a better solution.
It would also be nice if an error occurred in the network observable, that the db results would still be passed through. I could call onErrorResumeNext()
and return the dbObservable itself, but I would still like subscribers to be notified that an error occurred.
Any suggestions?
Use Observable.merge
directly. It merges several observable streams to one, so if database emits faster, you will receive it first.
public static Observable<List<MyModel>> getModels() {
return Observable.merge(dbObservable, networkObservable);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With