Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to combine RxJava Single & Completable Retrofit calls in Android Application

My current Android Application employs Retrofit and RxJava to choreograph my network calls.

I have modelled my HTTP GET(s) as Single<Response<String>> and POST(s) as Completable.

The sequence of calls that I require are as follows:-

Sequentially call GET(1), GET(2), GET(3)

Parallel call POST(1), POST(2)

When both POST(1) & POST(2) have completed OK, call GET(4).

I have a partial solution. I have coded calls for the the First three GET(s) followed by the POST calls

My code resembles this:-

Single.concat(getRequests())
                .subscribeOn(Schedulers.single())
                .doOnError(throwable -> Log.e(TAG, "Manage Totals Failed", throwable))
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        manageExecutions(combineExecutions());
                    }
                })
                .subscribe();

    /**
     * @return
     */
    private static Iterable<Single<Response<String>>> getRequests() {
        final API_CALL_GET[] apiCalls = API_CALL_GET.values();
        final List<Single<Response<String>>> requests = new ArrayList<>(apiCalls.length);

        for (final API_CALL_GET apiCall : apiCalls) {
            requests.add(apiCall.request());
        }

        return requests;
    }

public enum API_CALL_GET {

    GET_ONE {
        @Override
        public Single<Response<String>> request() {
            return RETRO_SERVICE
                .getOne(authToken, new HashMap<>())
                .doAfterSuccess(this::persistDataOne)
                .doOnError(error -> ever(error));
        }
    }, GET_TWO {
        @Override
        public Single<Response<String>> request() {
            return RETRO_SERVICE
                .getTwo(authToken, new HashMap<>())
                .doAfterSuccess(this::persistDataTwo)
                .doOnError(error -> ever(error));
        }
    },
    GET_THREE {
        @Override
        public Single<Response<String>> request() {
            return RETRO_SERVICE
                .getThree(authToken, new HashMap<>())
                .doAfterSuccess(this::persistDataThree)
                .doOnError(error -> ever(error));
        }
    };

    public abstract Single<Response<String>> request();

}


    private static Action manageExecutions(final List<Completable> completables) {

        return new Action() {
            @Override
            public void run() throws Exception {
                Completable
                .concat(completables)
                .subscribeOn(Schedulers.io())
                .doOnError(throwable -> Log.e(TAG, "Manage Totals Failed", throwable))
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        accumulateAmounts();
                    }
                })
                .subscribe();
            }
        };
    }


    /**
     * @return
     */
    private static List<Completable> combineExecutions() {
        final API_CALL_POST[] apiCalls = API_CALL_POST.values();
        final List<Completable> requests = new ArrayList<>(apiCalls.length);

        for (final API_CALL_POST apiCall : apiCalls) {
            requests.addAll(apiCall.requests());
        }

        return Lists.newArrayList(Iterables.unmodifiableIterable(requests));
    }

public enum API_CALL_POST {

    POST_ONE {
        @Override
        public List<Completable> requests() {
            return NetworkController.postRecommenderExecutions();
        }
    },
    POST_TWO {
        @Override
        public List<Completable> requests() {
            return NetworkController.postSavedSearcheExecutions();
        }
    };

    public abstract List<Completable> requests();

}


    public static List<Completable> postONE() {
        final List<Completable> completables = new ArrayList<>();

        final List<OneDO> oneDOS = fetchOnes();

        for (final OneDO oneDO : oneDOS) {
            completables.add(RETRO_SERVICE.runCompletableOnes(authToken, oneDO.getId())
                    .doOnError(new Consumer<Throwable>() {
                        @Override
                        public void accept(final Throwable throwable) throws Exception {
                            Log.e(TAG, "accept: ", throwable);
                        }
                    }));
        }

        return completables;
    }




    public static List<Completable> postTWO() {
        final List<Completable> completables = new ArrayList<>();

        final List<TwoDO> twoDOS = fetchTwos();

        for (final TwoDO twoDO : twoDOS) {
            completables.add(RETRO_SERVICE.runCompletableTwos(authToken, twoDO.getId())
                    .doOnError(new Consumer<Throwable>() {
                        @Override
                        public void accept(final Throwable throwable) throws Exception {
                            Log.e(TAG, "accept: ", throwable);
                        }
                    }));
        }

        return completables;
    }

What I am having difficulty with is correctly chaining my calls

e.g. I thought I would be able to develop a solution that resembled this pseudo code

Single.concat(GET_1... GET_N).onComplete(POST_1... POST_N).onComplete(GET_LAST)

however my current partial solution only calls the First group of GET(s) followed by the POST(s) and the GET and POST calls are not "chained"

I cannot see how to create a chain of calls that support my use case.

Is it possible to combine Single -> Completable -> Single in a chained call?

UPDATE

Based on Daniil answer I ended up with this solution:-

 Single.concat(getRequests())
                .subscribeOn(Schedulers.io())
                .doOnError(throwable -> Log.e(TAG, "accept[0000]: ", throwable))
                .ignoreElements()
                .andThen(Completable.merge(combineExecutions()))
                .doOnError(throwable -> Log.e(TAG, "accept: ", throwable))
                .doOnComplete(() -> Controller.accumulateTotals())
                .subscribe();
like image 550
Hector Avatar asked Aug 10 '18 07:08

Hector


People also ask

How do I combine multiple observables?

The RxJS merge() operator is a join operator that is used to turn multiple observables into a single observable. It creates an output Observable, which concurrently emits all values from every given input Observables.

What is single rxjava2?

Single is an Observable which only emits one item or throws an error. Single emits only one value and applying some of the operator makes no sense.

What is single in RxJava Android?

RxJava (and its derivatives like RxGroovy & RxScala) has developed an Observable variant called “Single.” A Single is something like an Observable, but instead of emitting a series of values — anywhere from none at all to an infinite number — it always either emits one value or an error notification.


2 Answers

In kotlin it would look something like this:

fun generateGetRequests(): List<Single<Response<String>>> {
    return listOf(retrofit.firstGet(), retrofit.secondGet(), ... ,retrofit.lastGet())
}

fun generatePostRequests(): List<Completable> {
    return listOf(retrofit.firstPost(), ..., retrofit.lastPost())
}

fun doSomethingWithResponses(responses: Array<Any>) {
    // Do Something, like save to db
}

fun runRequests() {
    Single.zip(generateGetRequests(), { responses ->
        doSomethingWithResponses(responses)
    }).ignoreElements()
        .andThen(Completable.merge(generatePostRequests()))
        .subscribeOn(Schedulers.io())
        .subscribe()
}
like image 113
Daniil Avatar answered Oct 16 '22 08:10

Daniil


Chaining up different types can happen either by converting them onto a shared reactive type (such as Observable) and concatenating, or using one of the continuation approaches via flatMapX and andThen:

 someSingle
 .flatMapCompletable(result1 -> {
     sideEffect(result1);
     return someCompletable;
 })
 .andThen(Single.defer(() -> {
     sideEffectAfterCompletable();
     return someOtherSingle;
 }))
 ...
like image 31
akarnokd Avatar answered Oct 16 '22 08:10

akarnokd