My current Android Application employs Retrofit
and RxJava
to choreograph my network calls.
I have modelled my HTTP GET(s) as Single<Response<String>>
and POST(s) as Completable
.
The sequence of calls that I require are as follows:-
Sequentially call GET(1), GET(2), GET(3)
Parallel call POST(1), POST(2)
When both POST(1) & POST(2) have completed OK, call GET(4).
I have a partial solution. I have coded calls for the the First three GET(s) followed by the POST calls
My code resembles this:-
Single.concat(getRequests())
.subscribeOn(Schedulers.single())
.doOnError(throwable -> Log.e(TAG, "Manage Totals Failed", throwable))
.doFinally(new Action() {
@Override
public void run() throws Exception {
manageExecutions(combineExecutions());
}
})
.subscribe();
/**
* @return
*/
private static Iterable<Single<Response<String>>> getRequests() {
final API_CALL_GET[] apiCalls = API_CALL_GET.values();
final List<Single<Response<String>>> requests = new ArrayList<>(apiCalls.length);
for (final API_CALL_GET apiCall : apiCalls) {
requests.add(apiCall.request());
}
return requests;
}
public enum API_CALL_GET {
GET_ONE {
@Override
public Single<Response<String>> request() {
return RETRO_SERVICE
.getOne(authToken, new HashMap<>())
.doAfterSuccess(this::persistDataOne)
.doOnError(error -> ever(error));
}
}, GET_TWO {
@Override
public Single<Response<String>> request() {
return RETRO_SERVICE
.getTwo(authToken, new HashMap<>())
.doAfterSuccess(this::persistDataTwo)
.doOnError(error -> ever(error));
}
},
GET_THREE {
@Override
public Single<Response<String>> request() {
return RETRO_SERVICE
.getThree(authToken, new HashMap<>())
.doAfterSuccess(this::persistDataThree)
.doOnError(error -> ever(error));
}
};
public abstract Single<Response<String>> request();
}
private static Action manageExecutions(final List<Completable> completables) {
return new Action() {
@Override
public void run() throws Exception {
Completable
.concat(completables)
.subscribeOn(Schedulers.io())
.doOnError(throwable -> Log.e(TAG, "Manage Totals Failed", throwable))
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
accumulateAmounts();
}
})
.subscribe();
}
};
}
/**
* @return
*/
private static List<Completable> combineExecutions() {
final API_CALL_POST[] apiCalls = API_CALL_POST.values();
final List<Completable> requests = new ArrayList<>(apiCalls.length);
for (final API_CALL_POST apiCall : apiCalls) {
requests.addAll(apiCall.requests());
}
return Lists.newArrayList(Iterables.unmodifiableIterable(requests));
}
public enum API_CALL_POST {
POST_ONE {
@Override
public List<Completable> requests() {
return NetworkController.postRecommenderExecutions();
}
},
POST_TWO {
@Override
public List<Completable> requests() {
return NetworkController.postSavedSearcheExecutions();
}
};
public abstract List<Completable> requests();
}
public static List<Completable> postONE() {
final List<Completable> completables = new ArrayList<>();
final List<OneDO> oneDOS = fetchOnes();
for (final OneDO oneDO : oneDOS) {
completables.add(RETRO_SERVICE.runCompletableOnes(authToken, oneDO.getId())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(final Throwable throwable) throws Exception {
Log.e(TAG, "accept: ", throwable);
}
}));
}
return completables;
}
public static List<Completable> postTWO() {
final List<Completable> completables = new ArrayList<>();
final List<TwoDO> twoDOS = fetchTwos();
for (final TwoDO twoDO : twoDOS) {
completables.add(RETRO_SERVICE.runCompletableTwos(authToken, twoDO.getId())
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(final Throwable throwable) throws Exception {
Log.e(TAG, "accept: ", throwable);
}
}));
}
return completables;
}
What I am having difficulty with is correctly chaining my calls
e.g. I thought I would be able to develop a solution that resembled this pseudo code
Single.concat(GET_1... GET_N).onComplete(POST_1... POST_N).onComplete(GET_LAST)
however my current partial solution only calls the First group of GET(s) followed by the POST(s) and the GET and POST calls are not "chained"
I cannot see how to create a chain of calls that support my use case.
Is it possible to combine Single
-> Completable
-> Single
in a chained call?
UPDATE
Based on Daniil answer I ended up with this solution:-
Single.concat(getRequests())
.subscribeOn(Schedulers.io())
.doOnError(throwable -> Log.e(TAG, "accept[0000]: ", throwable))
.ignoreElements()
.andThen(Completable.merge(combineExecutions()))
.doOnError(throwable -> Log.e(TAG, "accept: ", throwable))
.doOnComplete(() -> Controller.accumulateTotals())
.subscribe();
The RxJS merge() operator is a join operator that is used to turn multiple observables into a single observable. It creates an output Observable, which concurrently emits all values from every given input Observables.
Single is an Observable which only emits one item or throws an error. Single emits only one value and applying some of the operator makes no sense.
RxJava (and its derivatives like RxGroovy & RxScala) has developed an Observable variant called “Single.” A Single is something like an Observable, but instead of emitting a series of values — anywhere from none at all to an infinite number — it always either emits one value or an error notification.
In kotlin it would look something like this:
fun generateGetRequests(): List<Single<Response<String>>> {
return listOf(retrofit.firstGet(), retrofit.secondGet(), ... ,retrofit.lastGet())
}
fun generatePostRequests(): List<Completable> {
return listOf(retrofit.firstPost(), ..., retrofit.lastPost())
}
fun doSomethingWithResponses(responses: Array<Any>) {
// Do Something, like save to db
}
fun runRequests() {
Single.zip(generateGetRequests(), { responses ->
doSomethingWithResponses(responses)
}).ignoreElements()
.andThen(Completable.merge(generatePostRequests()))
.subscribeOn(Schedulers.io())
.subscribe()
}
Chaining up different types can happen either by converting them onto a shared reactive type (such as Observable
) and concatenating, or using one of the continuation approaches via flatMapX
and andThen
:
someSingle
.flatMapCompletable(result1 -> {
sideEffect(result1);
return someCompletable;
})
.andThen(Single.defer(() -> {
sideEffectAfterCompletable();
return someOtherSingle;
}))
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With