I have a PublishSubject
and a Subscriber
which I use to process a (possibly) infinite stream of preprocessed data. The problem is that some of the elements might contain some error. I'd like to ignore them and continue processing. How can I do so? I've tried something like this:
val subject = PublishSubject.create<String>()
subject.retry().subscribe({
println("next: $it")
}, {
println("error")
}, {
println("complete")
})
subject.onNext("foo")
subject.onNext("bar")
subject.onError(RuntimeException())
subject.onNext("wom")
subject.onComplete()
My problem is that none of the error handling methods help me out here:
onErrorResumeNext()
— instructs an Observable to emit a sequence of items if it encounters an error
onErrorReturn( )
— instructs an Observable to emit a particular item when it encounters an error
onExceptionResumeNext( )
— instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)
retry( )
— if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error
retryWhen( )
— if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source
I tried retry()
for example but it hangs my process after the error indefinitely.
I also tried onErrorResumeNext()
but it does not work as expected:
val backupSubject = PublishSubject.create<String>()
val subject = PublishSubject.create<String>()
var currentSubject = subject
subject.onErrorResumeNext(backupSubject).subscribe({
println("next: $it")
}, {
println("error")
currentSubject = backupSubject
}, {
println("complete")
})
backupSubject.subscribe({
println("backup")
}, {
println("backup error")
})
currentSubject.onNext("foo")
currentSubject.onNext("bar")
currentSubject.onError(RuntimeException())
currentSubject.onNext("wom")
currentSubject.onComplete()
This only prints foo
and bar
.
Retry on Error The normal sequence may be broken by a temporary system failure or backend error. In these situations, we want to retry and wait until the sequence is fixed. Luckily, RxJava gives us options to perform exactly that. 4.1. Retry
All of the streams are terminated and thus the streams do not emit any new values. That is why an error event is also a terminate event in RxJava.
As we say, we have an operator for almost everything in Rxjava. Correct ? In this blog, we will learn how to properly handle errors using RxJava operators in your android project.
Action on Error With doOnError, we can invoke whatever action that is needed when there is an error: In case of an exception being thrown while performing the action, RxJava wraps the exception in a CompositeException: 3.2. Resume With Default Items
If you want to continue processing after an error, it means your error is a value just like your String
s and should go through onNext
. To ensure type safety in this case, you should use some form of wrapper that can either take a regular value or an error; for example, the io.reactivex.Notification<T>
is available in RxJava 2:
PublishSubject<Notification<String>> subject = PublishSubject.create();
subject.subscribe(System.out::println);
subject.onNext(Notification.createOnNext("Hello"));
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops")));
subject.onNext(Notification.createOnNext("World"));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With