Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava Observable and Subscriber for skipping exception?

If I have an Observalbe :

List<Integer> ints = Lists.newArrayList(1, 2, 0, 3, 4);
Observable<Integer> o1 = Observable.from(ints);

I want to generate another observable , which divide by 12 :

Observable<Integer> o2 = o1.map(i -> 12/i);
o2.subscribe(
  v -> logger.info ("Subscriber value {}", v) ,
  t -> logger.error("Subscriber onError {} : {}", t.getClass() , t.getMessage())
);

It's obvious it will got error , and stopped when it encounter '0' :

RxTest - Subscriber value 12
RxTest - Subscriber value 6
RxTest - Subscriber onError class java.lang.ArithmeticException : / by zero

But what if I want the Observer(o2) skip the exception ?

I look into RxJava's doc about error handling , there is no way to skip the error. The onErrorResumeNext() and onExceptionResumeNext() needs a backup/fallback Observable , which is not what I want. The onErrorReturn need to specify the return value .

All three of the error handling methods cannot resume the original observer . for example :

Observable<Integer> o2 = o1.map(i -> 12/i)
      .onErrorReturn(t -> 0);

It prints :

RxTest - Subscriber value 12
RxTest - Subscriber value 6
RxTest - Subscriber value 0

Not printing the rest 12/3 and 12/4

The only solution seems relay in the map function :

Observable<Integer> o2 = o1.map(i -> {
  try {
    return Optional.of(12/i);
  } catch (ArithmeticException e) {
    return Optional.empty();
  }
}).filter(Optional::isPresent)
  .map(o -> (Integer) o.get());

It works , but it is cumbersome . I wonder if there's any way to easily skip any RuntimeException when manipulating Observable (such as map)

The above is about skipping exception in Observable . The following is about skipping exception in the Subscriber :

The situation is the same :

List<Integer> ints = Lists.newArrayList(1, 2, 0 , 3 , 4);
Observable<Integer> o1 = Observable.from(ints);
o1.subscribe(
  i -> logger.info("12 / {} = {}", i, 12 / i),
  t -> logger.error("{} : {}", t.getClass() , t.getMessage()),
  () -> logger.info("onCompleted")
);

It prints out :

RxTest - 12 / 1 = 12
RxTest - 12 / 2 = 6
RxTest - class java.lang.ArithmeticException : / by zero

When exception occurs in onNext , it triggers onError , and NOT RESPONDING to any data from the Observable . If I want the subscriber to swallow the exception , I have to try-catch the ArithmeticException in the onNext() . Is there any cleaner solution ?

It seems when a Subscriber faces an error in the onNext() that cannot be handled within (onNext) , it shall stop , right ? Is it a good design ?

like image 937
smallufo Avatar asked Feb 08 '15 05:02

smallufo


2 Answers

Observable.just(1, 2, 3, 0, 4)
            .flatMap(i -> Observable.defer(() -> Observable.just(12 / i))
                    .onErrorResumeNext(Observable.just(0)));

it's one way to go, though keep in mind that RxJava assumes that error is something truly unexcpeted (you can expect values to be 0). On the other hand, if you want to ignore divide by 0 exception maybe you should filter/map your values before division.

like image 195
krp Avatar answered Oct 22 '22 11:10

krp


Try this way.

Observable<Integer> o2 = o1.map(i -> 12/i).onErrorFlatMap(e -> Observable.empty());

In the above code, error events are replaced by an empty stream and the original stream is resuming. Consequently errors are skipped.

like image 37
findall Avatar answered Oct 22 '22 09:10

findall