I have a list of files that I'd like to upload to the backend from an Android device. Due to memory constraints, I'd like to make the second API call only after the first finished, the third after the second finished, and so on.
I wrote something like
private Observable<Integer> uploadFiles(List<File> files) {
return Observable.create(subscriber -> {
for (int i = 0, size = files.size(); i < size; i++) {
UploadModel uploadModel = new UploadModel(files.get(0));
int uploadResult = retrofitApi.uploadSynchronously(uploadModel);
subscriber.onNext(uploadResult);
}
subscriber.onCompleted();
}).subscribeOn(Schedulers.newThread());
}
But I feel like this might be going against the spirit of Rx, and the saying is if you're using Observable.create, you're probably doing it wrong... Is this a reasonable approach? Is there a better way to achieve this with Retrofit's RxJava integration?
RxAndroid is an extension of RxJava and it contains the Android threads to be used in the Android Environment. To use RxJava in retrofit environment we need to do just two major changes: Add the RxJava in Retrofit Builder. Use Observable type in the interface instead of Call.
If we want to consume the API asynchronously, we call the service as follows: String username = "sarahjean"; Call<User> call = apiService. getUser(username); call. enqueue(new Callback<User>() { @Override public void onResponse(Call<User> call, Response<User> response) { int statusCode = response.
Coming to RxJava, RxJava streams are prone to leaks, where a stream continues to process items even when you no longer care. Kotlin coroutines use structured concurrency, which makes it much easier to manage the lifecycle of all your concurrent code.
Naively, I would do that (it does not work, though, see below):
return Observable.from(files).concatMap(file -> retrofitApi.upload(uploadModel));
Now the issue is that there is no way to tell retrofit to use only one thread for those calls.
reduce
, however, passes the result of one function call to the next, along with the next emitted value from the original observable. That would work, but the function passed to reduce
needs to be synchronous. Not good.
Another approach would be to modify the observable recursively:
void getNextFile(int i) {
return retrofit.upload(i).
onNext(result -> getNextFile(i + 1));
}
roughly. But I am not sure how to clean it to make it more readable.
The cleanest I would think would be something like:
Observable.from(files).map(file -> retrofitApi.uploadSynchronously(new UploadModel(file)));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With