Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to continue processing after an error happens in RxJava 2?

I have a PublishSubject and a Subscriber which I use to process a (possibly) infinite stream of preprocessed data. The problem is that some of the elements might contain some error. I'd like to ignore them and continue processing. How can I do so? I've tried something like this:

    val subject = PublishSubject.create<String>()
    subject.retry().subscribe({
        println("next: $it")
    }, {
        println("error")
    }, {
        println("complete")
    })

    subject.onNext("foo")
    subject.onNext("bar")
    subject.onError(RuntimeException())
    subject.onNext("wom")
    subject.onComplete()

My problem is that none of the error handling methods help me out here:

onErrorResumeNext() — instructs an Observable to emit a sequence of items if it encounters an error

onErrorReturn( ) — instructs an Observable to emit a particular item when it encounters an error

onExceptionResumeNext( ) — instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)

retry( ) — if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error

retryWhen( ) — if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source

I tried retry() for example but it hangs my process after the error indefinitely.

I also tried onErrorResumeNext() but it does not work as expected:

    val backupSubject = PublishSubject.create<String>()
    val subject = PublishSubject.create<String>()
    var currentSubject = subject
    subject.onErrorResumeNext(backupSubject).subscribe({
        println("next: $it")
    }, {
        println("error")
        currentSubject = backupSubject
    }, {
        println("complete")
    })

    backupSubject.subscribe({
        println("backup")
    }, {
        println("backup error")
    })

    currentSubject.onNext("foo")
    currentSubject.onNext("bar")
    currentSubject.onError(RuntimeException())
    currentSubject.onNext("wom")
    currentSubject.onComplete()

This only prints foo and bar.

like image 959
Adam Arold Avatar asked Mar 14 '17 13:03

Adam Arold


People also ask

Why do I need to retry in RxJava?

Retry on Error The normal sequence may be broken by a temporary system failure or backend error. In these situations, we want to retry and wait until the sequence is fixed. Luckily, RxJava gives us options to perform exactly that. 4.1. Retry

What happens when a stream is terminated in RxJava?

All of the streams are terminated and thus the streams do not emit any new values. That is why an error event is also a terminate event in RxJava.

Is there an operator for almost everything in RxJava?

As we say, we have an operator for almost everything in Rxjava. Correct ? In this blog, we will learn how to properly handle errors using RxJava operators in your android project.

What to do when there is an error in RxJava?

Action on Error With doOnError, we can invoke whatever action that is needed when there is an error: In case of an exception being thrown while performing the action, RxJava wraps the exception in a CompositeException: 3.2. Resume With Default Items


1 Answers

If you want to continue processing after an error, it means your error is a value just like your Strings and should go through onNext. To ensure type safety in this case, you should use some form of wrapper that can either take a regular value or an error; for example, the io.reactivex.Notification<T> is available in RxJava 2:

PublishSubject<Notification<String>> subject = PublishSubject.create();

subject.subscribe(System.out::println);

subject.onNext(Notification.createOnNext("Hello"));
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops")));
subject.onNext(Notification.createOnNext("World"));
like image 147
akarnokd Avatar answered Sep 28 '22 07:09

akarnokd