Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Refactoring google's NetworkBoundResource class to use RxJava instead of LiveData

Google's android architecture components tutorial here has a part that explains how to abstract the logic of getting data over the network. In it, they create an abstract class called NetworkBoundResource using LiveData to create a reactive stream as the basis for all reactive network requests.

public abstract class NetworkBoundResource<ResultType, RequestType> {
private final AppExecutors appExecutors;

private final MediatorLiveData<Resource<ResultType>> result = new MediatorLiveData<>();

@MainThread
NetworkBoundResource(AppExecutors appExecutors) {
    this.appExecutors = appExecutors;
    result.setValue(Resource.loading(null));
    LiveData<ResultType> dbSource = loadFromDb();
    result.addSource(dbSource, data -> {
        result.removeSource(dbSource);
        if (shouldFetch()) {
            fetchFromNetwork(dbSource);
        } else {
            result.addSource(dbSource, newData -> result.setValue(Resource.success(newData)));
        }
    });
}

private void fetchFromNetwork(final LiveData<ResultType> dbSource) {
    LiveData<ApiResponse<RequestType>> apiResponse = createCall();
    // we re-attach dbSource as a new source, it will dispatch its latest value quickly
    result.addSource(dbSource, newData -> result.setValue(Resource.loading(newData)));
    result.addSource(apiResponse, response -> {
        result.removeSource(apiResponse);
        result.removeSource(dbSource);
        //noinspection ConstantConditions
        if (response.isSuccessful()) {
            appExecutors.diskIO().execute(() -> {
                saveCallResult(processResponse(response));
                appExecutors.mainThread().execute(() ->
                        // we specially request a new live data,
                        // otherwise we will get immediately last cached value,
                        // which may not be updated with latest results received from network.
                        result.addSource(loadFromDb(),
                                newData -> result.setValue(Resource.success(newData)))
                );
            });
        } else {
            onFetchFailed();
            result.addSource(dbSource,
                    newData -> result.setValue(Resource.error(response.errorMessage, newData)));
        }
    });
}

protected void onFetchFailed() {
}

public LiveData<Resource<ResultType>> asLiveData() {
    return result;
}

@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {
    return response.body;
}

@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);

@MainThread
protected abstract boolean shouldFetch();

@NonNull
@MainThread
protected abstract LiveData<ResultType> loadFromDb();

@NonNull
@MainThread
protected abstract LiveData<ApiResponse<RequestType>> createCall();
}

From What I understand, the logic of this class is to:

a) Create a MediatorLiveData called "result" as the main return object and set its initial value to Resource.loading(null)

b) Get the data from Android Room db as dbSource LiveData and add it to "result" as a source LiveData

c) On dbSource LiveData's first emission, remove the dbSource LiveData from "result" and call "shouldFetchFromNetwork()" which will

  1. IF TRUE, call "fetchDataFromNetwork(dbSource)" which creates a network call through "createCall()" that returns a LiveData of the response encapsulated as an ApiResponse object
  2. add back dbSource LiveData to "result" and on set emitted values to Resource.loading(data)
  3. add apiResponce LiveData to "result" and on first emission remove dbSource and apiResponce LiveDatas
  4. If apiResponse is successful, call "saveCallResult(processResponse(response))" and add back dbSource LiveData to "result" and set emitted values to Resource.success(newData)
  5. If apiResponse failed, call "onFetchFailed()" and add back dbSource LiveData to "result" and set emitted values to Resource.error(response.errorMessage, newData))
  6. IF FALSE, just add the dbSource LiveData to "result" and set emitted values to Resource.success(newData)

Given that this logic is the correct interpretation, I have tried to refactor this class to use RxJava Observables instead of LiveData. This is my attempt at a successful refactoring (I removed the initial Resource.loading(null) as I see this as superfluous).

public abstract class NetworkBoundResource<ResultType, RequestType> {

private Observable<Resource<ResultType>> result;

@MainThread
NetworkBoundResource() {
    Observable<Resource<ResultType>> source;
    if (shouldFetch()) {
        source = createCall()
                .subscribeOn(Schedulers.io())
                .doOnNext(apiResponse -> saveCallResult(processResponse(apiResponse)))
                .flatMap(apiResponse -> loadFromDb().toObservable().map(Resource::success))
                .doOnError(t -> onFetchFailed())
                .onErrorResumeNext(t -> {
                    return loadFromDb()
                            .toObservable()
                            .map(data -> Resource.error(t.getMessage(), data))

                })
                .observeOn(AndroidSchedulers.mainThread());
    } else {
        source = loadFromDb()
                .toObservable()
                .map(Resource::success);
    }

    result = Observable.concat(
            loadFromDb()
                    .toObservable()
                    .map(Resource::loading)
                    .take(1),
            source
    );
}

public Observable<Resource<ResultType>> asObservable() {return result;}

protected void onFetchFailed() {}

@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {return response.body;}

@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);

@MainThread
protected abstract boolean shouldFetch();

@NonNull
@MainThread
protected abstract Flowable<ResultType> loadFromDb();

@NonNull
@MainThread
protected abstract Observable<ApiResponse<RequestType>> createCall();
}

As I am new to RxJava, my question is am I correctly refactoring to RxJava and maintaining the same logic as the LiveData version of this class?

like image 275
flopshot Avatar asked Dec 24 '22 08:12

flopshot


1 Answers

public abstract class ApiRepositorySource<RawResponse extends BaseResponse, ResultType> {

    // result is a Flowable because Room Database only returns Flowables
    // Retrofit response will also be folded into the stream as a Flowable
    private Flowable<ApiResource<ResultType>> result; 
    private AppDatabase appDatabase;

    @MainThread
    ApiRepositorySource(AppDatabase appDatabase) {
        this.appDatabase = appDatabase;
        Flowable<ApiResource<ResultType>> source;
        if (shouldFetch()) {
            source = createCall()
                .doOnNext(this::saveCallResult)
                .flatMap(apiResponse -> loadFromDb().toObservable().map(ApiResource::success))
                .doOnError(this::onFetchFailed)
                .onErrorResumeNext(t -> {
                    return loadFromDb()
                            .toObservable()
                            .map(data -> {
                                ApiResource apiResource;

                                if (t instanceof HttpException && ((HttpException) t).code() >= 400 && ((HttpException) t).code() < 500) {
                                    apiResource = ApiResource.invalid(t.getMessage(), data);
                                } else {
                                    apiResource = ApiResource.error(t.getMessage(), data);
                                }

                                return apiResource;
                            });
                })
                .toFlowable(BackpressureStrategy.LATEST);
        } else {
            source = loadFromDb()
                    .subscribeOn(Schedulers.io())
                    .map(ApiResource::success);
        }

        result = Flowable.concat(initLoadDb()
                            .map(ApiResource::loading)
                            .take(1),
                            source)
                .subscribeOn(Schedulers.io());
    }

    public Observable<ApiResource<ResultType>> asObservable() {
        return result.toObservable();
    }

    @SuppressWarnings("WeakerAccess")
    protected void onFetchFailed(Throwable t) {
        Timber.e(t);
    }

    @WorkerThread
    protected void saveCallResult(@NonNull RawResult resultType) {
        resultType.saveResponseToDb(appDatabase);
    }

    @MainThread
    protected abstract boolean shouldFetch();

    @NonNull
    @MainThread
    protected abstract Flowable<ResultType> loadFromDb();

    @NonNull
    @MainThread
    protected abstract Observable<RawResult> createCall();

    @NonNull
    @MainThread
    protected Flowable<ResultType> initLoadDb() {
        return loadFromDb();
    }
}

So here is what I have decided on using after many iterations. This is currently in production and is working well for my app. Here are some take away notes:

  1. Create a BaseResponse interface

        public interface BaseResponse {
             void saveResponseToDb(AppDatabase appDatabase);
        }
    

    and implement it in all of your api response object classes. Doing this means you don't have to implement save_to_database logic in every ApiResource, you can just default it to what ever the response's implementation is, if you want.

  2. I have chosen to handle Retrofit error responses in the onErrorResumeNext block for simplicity, but I recommend you create a Transformer class that can hold all this logic. In this case, I added an extra Status enum value for ApiResources called INVALID for 400-level responses.

  3. You might be tempted to use the the Reactive Streams architecture component library for LiveData

    implementation "android.arch.lifecycle:reactivestreams:$lifecycle_version" and add a method to this class called

        public LiveData<ApiResource<ResultType>> asLiveData {
             return LiveDataReactiveStreams.fromPublisher(result);
        }
    

    In theory, this would work perfectly as our ViewModels wouldn't have to convert Observable emissions to LiveData emissions or implement lifecycle logic for Observables in Views. Unfortunately, this stream gets rebuilt on every configuration change because it disposes of the LiveData in any onDestroy called (whether isFinishing is true or false). Thus, we have to manage the lifecycle of this stream, which defeats the purpose of using it in the first place, or have duplicated calls every time the device rotates.

Here is an example of a UserRepository creating an instance of an ApiNetworkResource

@Singleton
public class UserRepository {

    private final RetrofitApi retrofitApi;
    private final AppDatabase appDatabase;

    @Inject
    UserRepository(RetrofitApi retrofitApi, AppDatabase appDatabase) {
        this.retrofitApi = retrofitApi;
        this.appDatabase = appDatabase;
    }

    public Observable<ApiResource<User>> getUser(long userId) {
        return new ApiRepositorySource<UserResponse, User>(appDatabase) {

            @Override
            protected boolean shouldFetch() {
                return true;
            }

            @NonNull
            @Override
            protected Flowable<User> loadFromDb() {
                return appDatabase.userDao().getUserFlowable(userId);
            }

            @NonNull
            @Override
            protected Observable<UserResponse> createCall() {
                return retrofitApi.getUserById(userId);
            }
        }.asObservable();
    }

}
like image 150
flopshot Avatar answered Dec 28 '22 05:12

flopshot