I have playing with Rxjava recently trying to implement a chain of events(Api callas/Database operations) and seem to have hit a roadblock when it comes to handling errors.
This is what I am trying to do. I am calling an Api that will check if user exists in the database. Based on the response I get, I am trying to chain a few sequences using rxjava. Following diagram might explain a little better.
checkUser()
/ \
No Yes
/ \
createUserRemote() FetchUserNotesRemote()
| |
End SaveUserNotesLocal()
|
End
I am able to chain together checkUser() -> FetchUserNotesRemote() -> SaveUserNotesLocal() sequence with the following code.
checkUser()
.flatMap(id -> {return fetchData(id);})
.flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
//handle onsuccess here
}
@Override
public void onError(Throwable e) {
//handle errors here
}
});
The issue I am mainly trying to solve.
1) To execute different chain of observables on error you can use method onErorrResumeNext()
. More info here: github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators
Example:
checkUser().flatMap(id -> {return fetchData(id);})
.flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
.onErrorResumeNext(throwable -> { return doSomethingDifferent(); }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
//handle onsuccess here
}
@Override
public void onError(Throwable e) {
//handle errors here
}
});
2) If the exception is thrown somewhere in your stream, it is passed down to subscriber onError()
. If you want to know at which part of stream error was thrown, you can add multiple onErorrResumeNext()
calls, that throw concrete exception after each api call.
checkUser()
.onErrorResumeNext(throwable -> { return Observable.error(new CheckUserException()); }
.flatMap(id -> {return fetchData(id);})
.onErrorResumeNext(throwable -> { return Observable.error(new FetchDataException()); }
.flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
.onErrorResumeNext(throwable -> { return Observable.error(new SaveDataException()); }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
//handle onsuccess here
}
@Override
public void onError(Throwable e) {
//handle errors here
}
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With