I have playing with Rxjava recently trying to implement a chain of events(Api callas/Database operations) and seem to have hit a roadblock when it comes to handling errors.
This is what I am trying to do. I am calling an Api that will check if user exists in the database. Based on the response I get, I am trying to chain a few sequences using rxjava. Following diagram might explain a little better.
checkUser()
/ \
No Yes
/ \
createUserRemote() FetchUserNotesRemote()
| |
End SaveUserNotesLocal()
|
End
I am able to chain together checkUser() -> FetchUserNotesRemote() -> SaveUserNotesLocal() sequence with the following code.
checkUser()
.flatMap(id -> {return fetchData(id);})
.flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
//handle onsuccess here
}
@Override
public void onError(Throwable e) {
//handle errors here
}
});
The issue I am mainly trying to solve.
1) To execute different chain of observables on error you can use method onErorrResumeNext(). More info here: github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators
Example:
checkUser().flatMap(id -> {return fetchData(id);})
.flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
.onErrorResumeNext(throwable -> { return doSomethingDifferent(); }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
//handle onsuccess here
}
@Override
public void onError(Throwable e) {
//handle errors here
}
});
2) If the exception is thrown somewhere in your stream, it is passed down to subscriber onError(). If you want to know at which part of stream error was thrown, you can add multiple onErorrResumeNext() calls, that throw concrete exception after each api call.
checkUser()
.onErrorResumeNext(throwable -> { return Observable.error(new CheckUserException()); }
.flatMap(id -> {return fetchData(id);})
.onErrorResumeNext(throwable -> { return Observable.error(new FetchDataException()); }
.flatMap(notesResponseObject -> {return saveFetchedData(notesResponseObject);})
.onErrorResumeNext(throwable -> { return Observable.error(new SaveDataException()); }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
//handle onsuccess here
}
@Override
public void onError(Throwable e) {
//handle errors here
}
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With