Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava error handling in chain API calls

I have playing with Rxjava recently trying to implement a chain of events(Api callas/Database operations) and seem to have hit a roadblock when it comes to handling errors.

This is what I am trying to do. I am calling an Api that will check if user exists in the database. Based on the response I get, I am trying to chain a few sequences using rxjava. Following diagram might explain a little better.

                          checkUser()
                         /          \
                       No           Yes
                       /              \
            createUserRemote()       FetchUserNotesRemote()
                      |                    |
                    End               SaveUserNotesLocal()
                                            |
                                           End

I am able to chain together checkUser() -> FetchUserNotesRemote() -> SaveUserNotesLocal() sequence with the following code.

checkUser()
            .flatMap(id -> {return fetchData(id);})
            .flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new SingleObserver<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onSuccess(Integer integer) {
                    //handle onsuccess here
                }

                @Override
                public void onError(Throwable e) {
                    //handle errors here
                }
            });

The issue I am mainly trying to solve.

  • I can't figure out how to handle a case where checkUser() returns
    a 404 http status. Because when that happens, subscriber's onError
    method gets called which seems to me is what should happen. How can I handle it so that when I get an error (404) response from API, instead of executing FetchUserNotesRemote() and SaveUserNotesLocal(), I execute a different chain of events?
  • Another thing I am not sure about is, if there is an error called on any of the observables in a chain, how does the subscriber's onError method know which observable called it?
like image 406
Subayyal Mustafvi Avatar asked Apr 04 '18 05:04

Subayyal Mustafvi


1 Answers

1) To execute different chain of observables on error you can use method onErorrResumeNext(). More info here: github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

Example:

checkUser().flatMap(id -> {return fetchData(id);})
           .flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
           .onErrorResumeNext(throwable -> { return doSomethingDifferent(); }
           .subscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(new SingleObserver<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onSuccess(Integer integer) {
                    //handle onsuccess here
                }

                @Override
                public void onError(Throwable e) {
                    //handle errors here
                }
            });

2) If the exception is thrown somewhere in your stream, it is passed down to subscriber onError(). If you want to know at which part of stream error was thrown, you can add multiple onErorrResumeNext() calls, that throw concrete exception after each api call.

    checkUser()
           .onErrorResumeNext(throwable -> { return Observable.error(new CheckUserException()); }
           .flatMap(id -> {return fetchData(id);})
           .onErrorResumeNext(throwable -> { return Observable.error(new FetchDataException()); }
           .flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
           .onErrorResumeNext(throwable -> { return Observable.error(new SaveDataException()); }
           .subscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(new SingleObserver<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onSuccess(Integer integer) {
                    //handle onsuccess here
                }

                @Override
                public void onError(Throwable e) {
                    //handle errors here
                }
            });
like image 103
mol Avatar answered Nov 14 '22 23:11

mol