I am learning RxJava(Absolutely new, Sorry if this question is too basic), and I am having difficulty on error handling mechanism(I have gone through docs, but that is way advanced for me to grasp).
This is my code,
public static void main(String[] args) {
Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Upma", "Idly");
Observer<String> myObserver = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// do nothing with Disposable, disregard for now
}
@Override
public void onNext(String value) {
System.out.println("RECEIVED: " + value);
throw new RuntimeException("I am thrown");
}
@Override
public void onError(Throwable e) {
System.out.println("I got an error !");
e.printStackTrace(new PrintStream(System.out));
}
@Override
public void onComplete() {
System.out.println("Done!");
}
};
source.subscribe(myObserver);
}
And this is my stacktrace
RECEIVED: Alpha
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: I am thrown
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
at io.reactivex.Observable.subscribe(Observable.java:12275)
at reactivex.ReactMain.main(ReactMain.java:36)
Caused by: java.lang.RuntimeException: I am thrown
at reactivex.ReactMain$1.onNext(ReactMain.java:22)
at reactivex.ReactMain$1.onNext(ReactMain.java:1)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:108)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:37)
at io.reactivex.Observable.subscribe(Observable.java:12268)
... 1 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: I am thrown
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
at io.reactivex.Observable.subscribe(Observable.java:12275)
at reactivex.ReactMain.main(ReactMain.java:36)
Caused by: java.lang.RuntimeException: I am thrown
at reactivex.ReactMain$1.onNext(ReactMain.java:22)
at reactivex.ReactMain$1.onNext(ReactMain.java:1)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:108)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:37)
at io.reactivex.Observable.subscribe(Observable.java:12268)
... 1 more
Exception in thread "main" java.lang.NullPointerException: Actually not, but can't throw other exceptions due to RS
at io.reactivex.Observable.subscribe(Observable.java:12277)
at reactivex.ReactMain.main(ReactMain.java:36)
Caused by: java.lang.RuntimeException: I am thrown
at reactivex.ReactMain$1.onNext(ReactMain.java:22)
at reactivex.ReactMain$1.onNext(ReactMain.java:1)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:108)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:37)
at io.reactivex.Observable.subscribe(Observable.java:12268)
... 1 more
I have two questions around this.
1) I have overridden onError method of Observer. Why is my onError() not able to catch the exceptions?
2) Even if the onError fails(I expect a why in answer 1), Why is UndeliverableException is thrown only twice? Ideally, it must have been thrown 4 times since I have 4 other Observable Strings?
1.
From onError documentation:
Notifies the Observer that the Observable has experienced an error condition.
onError will not be called because there was no error in the source observable. The error was thrown in onNext method of the observer. If you want to test onError you need to throw error inside the stream, for example:
sourece
.map( str -> throw new RuntimeException("I am thrown: " + str))
.subscribe(myObserver);
Above code will call onError instead of onNext.
2. Why is UndeliverableException is thrown only twice?
I think UndeliverableException is thrown only once, and the whole error message describes just that one crash. As soon as your code exit with error at onNext method with "alpha", nothing should happen after that.
Try running your code with just one element, like this:
Observable<String> source = Observable.just("Alpha");
and see if you get the same error message. Also, you can check if anything is emitted:
Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Upma", "Idly")
.doOnNext(/* put log here to see what is being emitted */);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With