Google's android architecture components tutorial here has a part that explains how to abstract the logic of getting data over the network. In it, they create an abstract class called NetworkBoundResource using LiveData to create a reactive stream as the basis for all reactive network requests.
public abstract class NetworkBoundResource<ResultType, RequestType> {
private final AppExecutors appExecutors;
private final MediatorLiveData<Resource<ResultType>> result = new MediatorLiveData<>();
@MainThread
NetworkBoundResource(AppExecutors appExecutors) {
this.appExecutors = appExecutors;
result.setValue(Resource.loading(null));
LiveData<ResultType> dbSource = loadFromDb();
result.addSource(dbSource, data -> {
result.removeSource(dbSource);
if (shouldFetch()) {
fetchFromNetwork(dbSource);
} else {
result.addSource(dbSource, newData -> result.setValue(Resource.success(newData)));
}
});
}
private void fetchFromNetwork(final LiveData<ResultType> dbSource) {
LiveData<ApiResponse<RequestType>> apiResponse = createCall();
// we re-attach dbSource as a new source, it will dispatch its latest value quickly
result.addSource(dbSource, newData -> result.setValue(Resource.loading(newData)));
result.addSource(apiResponse, response -> {
result.removeSource(apiResponse);
result.removeSource(dbSource);
//noinspection ConstantConditions
if (response.isSuccessful()) {
appExecutors.diskIO().execute(() -> {
saveCallResult(processResponse(response));
appExecutors.mainThread().execute(() ->
// we specially request a new live data,
// otherwise we will get immediately last cached value,
// which may not be updated with latest results received from network.
result.addSource(loadFromDb(),
newData -> result.setValue(Resource.success(newData)))
);
});
} else {
onFetchFailed();
result.addSource(dbSource,
newData -> result.setValue(Resource.error(response.errorMessage, newData)));
}
});
}
protected void onFetchFailed() {
}
public LiveData<Resource<ResultType>> asLiveData() {
return result;
}
@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {
return response.body;
}
@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);
@MainThread
protected abstract boolean shouldFetch();
@NonNull
@MainThread
protected abstract LiveData<ResultType> loadFromDb();
@NonNull
@MainThread
protected abstract LiveData<ApiResponse<RequestType>> createCall();
}
From What I understand, the logic of this class is to:
a) Create a MediatorLiveData called "result" as the main return object and set its initial value to Resource.loading(null)
b) Get the data from Android Room db as dbSource LiveData and add it to "result" as a source LiveData
c) On dbSource LiveData's first emission, remove the dbSource LiveData from "result" and call "shouldFetchFromNetwork()" which will
Given that this logic is the correct interpretation, I have tried to refactor this class to use RxJava Observables instead of LiveData. This is my attempt at a successful refactoring (I removed the initial Resource.loading(null) as I see this as superfluous).
public abstract class NetworkBoundResource<ResultType, RequestType> {
private Observable<Resource<ResultType>> result;
@MainThread
NetworkBoundResource() {
Observable<Resource<ResultType>> source;
if (shouldFetch()) {
source = createCall()
.subscribeOn(Schedulers.io())
.doOnNext(apiResponse -> saveCallResult(processResponse(apiResponse)))
.flatMap(apiResponse -> loadFromDb().toObservable().map(Resource::success))
.doOnError(t -> onFetchFailed())
.onErrorResumeNext(t -> {
return loadFromDb()
.toObservable()
.map(data -> Resource.error(t.getMessage(), data))
})
.observeOn(AndroidSchedulers.mainThread());
} else {
source = loadFromDb()
.toObservable()
.map(Resource::success);
}
result = Observable.concat(
loadFromDb()
.toObservable()
.map(Resource::loading)
.take(1),
source
);
}
public Observable<Resource<ResultType>> asObservable() {return result;}
protected void onFetchFailed() {}
@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {return response.body;}
@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);
@MainThread
protected abstract boolean shouldFetch();
@NonNull
@MainThread
protected abstract Flowable<ResultType> loadFromDb();
@NonNull
@MainThread
protected abstract Observable<ApiResponse<RequestType>> createCall();
}
As I am new to RxJava, my question is am I correctly refactoring to RxJava and maintaining the same logic as the LiveData version of this class?
public abstract class ApiRepositorySource<RawResponse extends BaseResponse, ResultType> {
// result is a Flowable because Room Database only returns Flowables
// Retrofit response will also be folded into the stream as a Flowable
private Flowable<ApiResource<ResultType>> result;
private AppDatabase appDatabase;
@MainThread
ApiRepositorySource(AppDatabase appDatabase) {
this.appDatabase = appDatabase;
Flowable<ApiResource<ResultType>> source;
if (shouldFetch()) {
source = createCall()
.doOnNext(this::saveCallResult)
.flatMap(apiResponse -> loadFromDb().toObservable().map(ApiResource::success))
.doOnError(this::onFetchFailed)
.onErrorResumeNext(t -> {
return loadFromDb()
.toObservable()
.map(data -> {
ApiResource apiResource;
if (t instanceof HttpException && ((HttpException) t).code() >= 400 && ((HttpException) t).code() < 500) {
apiResource = ApiResource.invalid(t.getMessage(), data);
} else {
apiResource = ApiResource.error(t.getMessage(), data);
}
return apiResource;
});
})
.toFlowable(BackpressureStrategy.LATEST);
} else {
source = loadFromDb()
.subscribeOn(Schedulers.io())
.map(ApiResource::success);
}
result = Flowable.concat(initLoadDb()
.map(ApiResource::loading)
.take(1),
source)
.subscribeOn(Schedulers.io());
}
public Observable<ApiResource<ResultType>> asObservable() {
return result.toObservable();
}
@SuppressWarnings("WeakerAccess")
protected void onFetchFailed(Throwable t) {
Timber.e(t);
}
@WorkerThread
protected void saveCallResult(@NonNull RawResult resultType) {
resultType.saveResponseToDb(appDatabase);
}
@MainThread
protected abstract boolean shouldFetch();
@NonNull
@MainThread
protected abstract Flowable<ResultType> loadFromDb();
@NonNull
@MainThread
protected abstract Observable<RawResult> createCall();
@NonNull
@MainThread
protected Flowable<ResultType> initLoadDb() {
return loadFromDb();
}
}
So here is what I have decided on using after many iterations. This is currently in production and is working well for my app. Here are some take away notes:
Create a BaseResponse
interface
public interface BaseResponse {
void saveResponseToDb(AppDatabase appDatabase);
}
and implement it in all of your api response object classes. Doing this means you don't have to implement save_to_database logic in every ApiResource, you can just default it to what ever the response's implementation is, if you want.
I have chosen to handle Retrofit error responses in the onErrorResumeNext block for simplicity, but I recommend you create a Transformer class that can hold all this logic. In this case, I added an extra Status
enum value for ApiResources called INVALID
for 400-level responses.
You might be tempted to use the the Reactive Streams architecture component library for LiveData
implementation "android.arch.lifecycle:reactivestreams:$lifecycle_version"
and add a method to this class called
public LiveData<ApiResource<ResultType>> asLiveData {
return LiveDataReactiveStreams.fromPublisher(result);
}
In theory, this would work perfectly as our ViewModels wouldn't have to convert Observable emissions to LiveData emissions or implement lifecycle logic for Observables in Views. Unfortunately, this stream gets rebuilt on every configuration change because it disposes of the LiveData in any onDestroy called (whether isFinishing is true or false). Thus, we have to manage the lifecycle of this stream, which defeats the purpose of using it in the first place, or have duplicated calls every time the device rotates.
Here is an example of a UserRepository
creating an instance of an ApiNetworkResource
@Singleton
public class UserRepository {
private final RetrofitApi retrofitApi;
private final AppDatabase appDatabase;
@Inject
UserRepository(RetrofitApi retrofitApi, AppDatabase appDatabase) {
this.retrofitApi = retrofitApi;
this.appDatabase = appDatabase;
}
public Observable<ApiResource<User>> getUser(long userId) {
return new ApiRepositorySource<UserResponse, User>(appDatabase) {
@Override
protected boolean shouldFetch() {
return true;
}
@NonNull
@Override
protected Flowable<User> loadFromDb() {
return appDatabase.userDao().getUserFlowable(userId);
}
@NonNull
@Override
protected Observable<UserResponse> createCall() {
return retrofitApi.getUserById(userId);
}
}.asObservable();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With