Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava onErrorResumeNext()

I have two observables (named A and B for simplicity) and one subscriber. So, the Subscriber subscribes to A and if there's an error on A then B (which is the fallback) kicks in. Now, whenever A hits an error B gets called fine, however A calls onComplete() on the subscriber, so B response never reaches the subscriber even if B execution is successful.

Is this the normal behaviour? I thought onErrorResumeNext() should continue the stream and notify the subscriber once completed as noted in the documentation (https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#onerrorresumenext).

This is the overall structure of what I'm doing (omitted several "boring" code):

public Observable<ModelA> observeGetAPI(){
    return retrofitAPI.getObservableAPI1()
            .flatMap(observableApi1Response -> {
                ModelA model = new ModelA();

                model.setApi1Response(observableApi1Response);

                return retrofitAPI.getObservableAPI2()
                        .map(observableApi2Response -> {
                            // Blah blah blah...
                            return model;
                        })
                        .onErrorResumeNext(observeGetAPIFallback(model))
                        .subscribeOn(Schedulers.newThread())
            })
            .onErrorReturn(throwable -> {
                // Blah blah blah...
                return model;
            })
            .subscribeOn(Schedulers.newThread());
}

private Observable<ModelA> observeGetAPIFallback(ModelA model){
    return retrofitAPI.getObservableAPI3().map(observableApi3Response -> {
        // Blah blah blah...
        return model;
    }).onErrorReturn(throwable -> {
        // Blah blah blah...
        return model;
    })
    .subscribeOn(Schedulers.immediate());
}

Subscription subscription;
subscription = observeGetAPI.subscribe(ModelA -> {
    // IF THERE'S AN ERROR WE NEVER GET B RESPONSE HERE...
}, throwable ->{
    // WE NEVER GET HERE... onErrorResumeNext()
},
() -> { // IN CASE OF AN ERROR WE GET STRAIGHT HERE, MEANWHILE, B GETS EXECUTED }
);

Any ideas what I'm doing wrong?

Thanks!

EDIT: Here's a rough timeline of what's happening:

---> HTTP GET REQUEST B
<--- HTTP 200 REQUEST B RESPONSE (SUCCESS)

---> HTTP GET REQUEST A
<--- HTTP 200 REQUEST A RESPONSE (FAILURE!)

---> HTTP GET FALLBACK A
** onComplete() called! ---> Subscriber never gets fallback response since onComplete() gets called before time.
<--- HTTP 200 FALLBACK A RESPONSE (SUCCESS)

And here's a link to a simple diagram I made which represent's what I want to happen: Diagram

like image 217
mradzinski Avatar asked Sep 03 '14 00:09

mradzinski


People also ask

What is onErrorResumeNext RxJava?

RxJava implements the Catch operator with three distinct operators: onErrorReturn. instructs an Observable to emit a particular item when it encounters an error, and then terminate normally. onErrorResumeNext. instructs an Observable to begin emitting a second Observable sequence if it encounters an error.

How will you handle error in RxJava?

Here, in the above code we see as soon as we get an exception in map operator and then we directly goto onError and the onNext doesn't get called or even onComplete. So, to handle the error in cases like this we use different operators and it will not move to onError directly. Let us understand them one by one.

What is Completable RxJava?

Single and Completable are new types introduced exclusively at RxJava that represent reduced types of Observable , that have more concise API. Single represent Observable that emit single value or error. Completable represent Observable that emits no value, but only terminal events, either onError or onCompleted.


1 Answers

The Rx calls used in the following should simulate what you are doing with Retrofit.

fallbackObservable =
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        logger.v("emitting A Fallback");
                        subscriber.onNext("A Fallback");
                        subscriber.onCompleted();
                    }
                })
                .delay(1, TimeUnit.SECONDS)
                .onErrorReturn(new Func1<Throwable, String>() {
                    @Override
                    public String call(Throwable throwable) {
                        logger.v("emitting Fallback Error");
                        return "Fallback Error";
                    }
                })
                .subscribeOn(Schedulers.immediate());

stringObservable =
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        logger.v("emitting B");
                        subscriber.onNext("B");
                        subscriber.onCompleted();
                    }
                })
                .delay(1, TimeUnit.SECONDS)
                .flatMap(new Func1<String, Observable<String>>() {
                    @Override
                    public Observable<String> call(String s) {
                        logger.v("flatMapping B");
                        return Observable
                                .create(new Observable.OnSubscribe<String>() {
                                    @Override
                                    public void call(Subscriber<? super String> subscriber) {
                                        logger.v("emitting A");
                                        subscriber.onNext("A");
                                        subscriber.onCompleted();
                                    }
                                })
                                .delay(1, TimeUnit.SECONDS)
                                .map(new Func1<String, String>() {
                                    @Override
                                    public String call(String s) {
                                        logger.v("A completes but contains invalid data - throwing error");
                                        throw new NotImplementedException("YUCK!");
                                    }
                                })
                                .onErrorResumeNext(fallbackObservable)
                                .subscribeOn(Schedulers.newThread());
                    }
                })
                .onErrorReturn(new Func1<Throwable, String>() {
                    @Override
                    public String call(Throwable throwable) {
                        logger.v("emitting Return Error");
                        return "Return Error";
                    }
                })
                .subscribeOn(Schedulers.newThread());

subscription = stringObservable.subscribe(
        new Action1<String>() {
            @Override
            public void call(String s) {
                logger.v("onNext " + s);
            }
        },
        new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                logger.v("onError");
            }
        },
        new Action0() {
            @Override
            public void call() {
                logger.v("onCompleted");
            }
        });

The output from the log statements is:

RxNewThreadScheduler-1 emitting B
RxComputationThreadPool-1 flatMapping B
RxNewThreadScheduler-2 emitting A
RxComputationThreadPool-2 A completes but contains invalid data - throwing error
RxComputationThreadPool-2 emitting A Fallback
RxComputationThreadPool-1 onNext A Fallback
RxComputationThreadPool-1 onCompleted

This seems like what you are looking for but maybe I'm missing something.

like image 141
kjones Avatar answered Oct 07 '22 18:10

kjones