Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava default error handler

Sorry, is there a possibility to set default error handler in RxJava?

For instance, I some code in Utils.kt file:

fun BaseFragment.callGallery(view: View){
    view.clicks().bindToLifecycle(this).subscribe {
        RxPaparazzo.takeImage(this)
        .usingGallery()
        .subscribe { response ->
            throw RuntimeException("Where is this exception from?")
        }
    }
}

But in stacktrace there's no any hint about Utils.kt or about any of my file.

I understand that I can set onError in every subscriber. with code like:

.subscribe ({ response ->
    ....
}, { it.printStackTrace() })

But I'd prefer to set one default onError for all subscribers. How can I do it?

Stacktrace:

05-07 12:11:48.246 10966-10966/ru.egslava.rxfluxtest E/AndroidRuntime: FATAL EXCEPTION: main
 Process: ru.egslava.rxfluxtest, PID: 10966
 java.lang.RuntimeException: Unable to destroy activity {ru.egslava.rxfluxtest/rx_activity_result.HolderActivity}: rx.exceptions.OnErrorNotImplementedException: Where is this exception from?
     at android.app.ActivityThread.performDestroyActivity(ActivityThread.java:3831)
     at android.app.ActivityThread.handleDestroyActivity(ActivityThread.java:3849)
     at android.app.ActivityThread.-wrap5(ActivityThread.java)
     at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1398)
     at android.os.Handler.dispatchMessage(Handler.java:102)
     at android.os.Looper.loop(Looper.java:148)
     at android.app.ActivityThread.main(ActivityThread.java:5417)
     at java.lang.reflect.Method.invoke(Native Method)
     at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
     at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
  Caused by: rx.exceptions.OnErrorNotImplementedException: Where is this exception from?
     at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:374)
     at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:371)
     at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
     at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157)
     at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
     at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:204)
     at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:144)
     at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext(OperatorOnErrorResumeNextViaFunction.java:153)
     at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
     at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:391)
     at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:353)
     at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:838)
     at rx.observers.Subscribers$5.onNext(Subscribers.java:229)
     at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:264)
     at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:335)
     at rx.internal.operators.OperatorMap$MapSubscriber.onNext(OperatorMap.java:74)
     at rx.internal.util.ScalarSynchronousObservable$WeakSingleProducer.request(ScalarSynchronousObservable.java:268)
     at rx.Subscriber.setProducer(Subscriber.java:211)
     at rx.internal.operators.OperatorMap$MapSubscriber.setProducer(OperatorMap.java:99)
     at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:79)
     at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:75)
     at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
     at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
     at rx.Observable.unsafeSubscribe(Observable.java:8452)
     at rx.internal.operators.OperatorZip$Zip.start(OperatorZip.java:214)
     at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:156)
     at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:122)
     at rx.internal.util.ScalarSynchronousObservable$WeakSingleProducer.request(ScalarSynchronousObservable.java:268)
     at rx.Subscriber.setProducer(Subscriber.java:209)
     at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:79)
     at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:75)
     at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50)
     at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
     at rx.Observable.unsafeSubscribe(Observable.java:8452)
     at rx.internal.util.ScalarSynchronousObservable$4.call(ScalarSynchronousObservable.java:227)
     at rx.internal.util.ScalarSynchronousObservable$4.call(ScalarSynchronousObservable.java:220)
     at rx.Observable.unsafeSubscribe(Observable.java:8452)
     at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:248)
like image 760
Slava Avatar asked May 07 '16 09:05

Slava


People also ask

What is Completable RxJava?

Completable is only concerned with execution completion whether the task has reach to completion or some error has occurred. interface CompletableObserver<T> { void onSubscribe(Disposable d); void onComplete(); void onError(Throwable error);

What is RxJava in Android example?

RxJava is a JVM library that uses observable sequences to perform asynchronous and event-based programming. Its primary building blocks are triple O's, which stand for Operator, Observer, and Observables. And we use them to complete asynchronous tasks in our project. It greatly simplifies multithreading in our project.

What is RX Java?

RxJava is a Java library that enables Functional Reactive Programming in Android development. It raises the level of abstraction around threading in order to simplify the implementation of complex concurrent behavior.


1 Answers

RxJava has Plugins to support cross cutting concerns nicely. In particular RxJavaErrorHandler is desiged to attached global error handling behavior like so:

RxJavaPlugins.getInstance().registerErrorHandler(object : RxJavaErrorHandler() {
    override fun handleError(e: Throwable?) {
        println("Global error handler: $e")
    }
})

Observable.just(1).concatMap({ Observable.error<Int>(Exception("Just throwing $it")) })
        .subscribe({
            println("I'll not be called")
        }, {
            println("Specific error handler: $it")
        })

The above code would print:

Error occurred java.lang.Exception: Just throwing 1
A default error handler: java.lang.Exception: Just throwing 1

I suspect that you'd also be interested in improving diagnostic information available in stacktraces when you do have an unhandled error. For that there's a RxJavaStackTracer that when used RxJavaPlugins.getInstance().registerObservableExecutionHook(RxJavaStackTracer()) enhaces stack trace information. The Stacktraces and subscribeOn/observeOn issue on github is a good read on the topic.

like image 166
miensol Avatar answered Sep 20 '22 01:09

miensol