Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Making N sequential api calls using RxJava and Retrofit

I have a list of files that I'd like to upload to the backend from an Android device. Due to memory constraints, I'd like to make the second API call only after the first finished, the third after the second finished, and so on.

I wrote something like

private Observable<Integer> uploadFiles(List<File> files) {
        return Observable.create(subscriber -> {
            for (int i = 0, size = files.size(); i < size; i++) {
                UploadModel uploadModel = new UploadModel(files.get(0));
                int uploadResult = retrofitApi.uploadSynchronously(uploadModel);
                subscriber.onNext(uploadResult);
            }
            subscriber.onCompleted();
        }).subscribeOn(Schedulers.newThread());
    }

But I feel like this might be going against the spirit of Rx, and the saying is if you're using Observable.create, you're probably doing it wrong... Is this a reasonable approach? Is there a better way to achieve this with Retrofit's RxJava integration?

like image 905
Major Laslo Avatar asked Aug 31 '15 17:08

Major Laslo


People also ask

How do you use RxJava with retrofit?

RxAndroid is an extension of RxJava and it contains the Android threads to be used in the Android Environment. To use RxJava in retrofit environment we need to do just two major changes: Add the RxJava in Retrofit Builder. Use Observable type in the interface instead of Call.

How do I retrofit an API call?

If we want to consume the API asynchronously, we call the service as follows: String username = "sarahjean"; Call<User> call = apiService. getUser(username); call. enqueue(new Callback<User>() { @Override public void onResponse(Call<User> call, Response<User> response) { int statusCode = response.

What is the difference between coroutines and RxJava?

Coming to RxJava, RxJava streams are prone to leaks, where a stream continues to process items even when you no longer care. Kotlin coroutines use structured concurrency, which makes it much easier to manage the lifecycle of all your concurrent code.


1 Answers

Naively, I would do that (it does not work, though, see below):

return Observable.from(files).concatMap(file -> retrofitApi.upload(uploadModel));

Now the issue is that there is no way to tell retrofit to use only one thread for those calls.

reduce, however, passes the result of one function call to the next, along with the next emitted value from the original observable. That would work, but the function passed to reduce needs to be synchronous. Not good.

Another approach would be to modify the observable recursively:

void getNextFile(int i) {
    return retrofit.upload(i).
        onNext(result -> getNextFile(i + 1));
}

roughly. But I am not sure how to clean it to make it more readable.

The cleanest I would think would be something like:

Observable.from(files).map(file -> retrofitApi.uploadSynchronously(new UploadModel(file)));
like image 116
njzk2 Avatar answered Sep 23 '22 18:09

njzk2