As I understand RxJava2 values.take(1)
creates another Observable that contains only one element from the original Observable. Which MUST NOT throw an exception as it is filtered out by the effect of take(1)
as it's happened second.
as in the following code snippet
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onError(new Exception("Oops"));
});
values.take(1)
.subscribe(
System.out::println,
e -> System.out.println("Error: " + e.getMessage()),
() -> System.out.println("Completed")
);
1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main$0(ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main$0(ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
create(...)
is stopped. To be fully safe in this case you need to use o.isDisposed()
to see if the observable has ended downstream.onError
call to be lost. It is either delivered downstream or thrown as a global UndeliverableException
if the observable has already terminated. It is up to the creator of the Observable to 'properly' handle the case where the observable has ended and an Exception occurs.Observable
) and the consumer (Subscriber
) disagreeing on when the stream ends. Since the producer is outliving the consumer in this case, the problem can only be fixed in the producer.@Kiskae in previous comment correctly answered about the reason why such exception can occurs.
Here the link to official doc about this theme: RxJava2-wiki.
Sometimes you cannot change this behaviour so there is a way how to handle this UndeliverableException
's. Here is code snippet of how to avoid crashes and misbehaviour:
RxJavaPlugins.setErrorHandler(e -> {
if (e instanceof UndeliverableException) {
e = e.getCause();
}
if ((e instanceof IOException) || (e instanceof SocketException)) {
// fine, irrelevant network problem or API that throws on cancellation
return;
}
if (e instanceof InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return;
}
if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
// that's likely a bug in the application
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
if (e instanceof IllegalStateException) {
// that's a bug in RxJava or in a custom operator
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
Log.warning("Undeliverable exception received, not sure what to do", e);
});
This code taken from the link above.
Important note. This approach sets global error handler to RxJava so if you can to get rid of these exceptions - it would be better option.
Kotlin
I call this in MainActivity onCreate method
private fun initRxErrorHandler(){
RxJavaPlugins.setErrorHandler { throwable ->
if (throwable is UndeliverableException) {
throwable.cause?.let {
Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), it)
return@setErrorHandler
}
}
if (throwable is IOException || throwable is SocketException) {
// fine, irrelevant network problem or API that throws on cancellation
return@setErrorHandler
}
if (throwable is InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return@setErrorHandler
}
if (throwable is NullPointerException || throwable is IllegalArgumentException) {
// that's likely a bug in the application
Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
return@setErrorHandler
}
if (throwable is IllegalStateException) {
// that's a bug in RxJava or in a custom operator
Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
return@setErrorHandler
}
Log.w("Undeliverable exception", throwable)
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With