Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best practice for handling onError and continuing processing

Tags:

rx-java

I am new to RxJava but I am integrating it into a project that I am working on to help me learn it. I have run into a question about best practices.

I have a question about how to handle onError from preventing the stopping of the Observable processing.

Here is the setup:

I have a list of userIds for each one I would like to do 2 or more network requests. If any of the network requests fails for the userid then that userid won't be updated and can be skipped. This should not prevent the other userids from being processed. I do have a solution, but it involves nested subscribes (see second block of code). One problem I do see is, if each call fails there is no way to short circuit and stop the remaining from hitting a network resource even after detecting a certain threshold number have failed.

Is there a better way to do this?

In traditional code:

List<String> results = new ArrayList<String>(); for (String userId : userIds) {     try {         String info = getInfo(userId);  // can throw an GetInfoException         String otherInfo = getOtherInfo(userId);  // can throw an GetOtherInfoException         results.add(info + ", " + otherInfo);     } catch (GetInfoException e) {         log.error(e);     } catch (GetOtherInfoException e) {         log.error(e);     } } 

PROBLEM:

Pseudocode:

userid -> network requests -> result  1 -> a, b -> onNext(1[a ,b]) 2 -> a, onError -> onError 3 -> a, b -> onNext(3[a, b]) 4 -> a, b -> onNext(4[a, b]) 

The following is a working example of a list of userIds and for each 2 requests for info. If you run it you will see that it will fail (see below the source code)

import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func1;  public class TestMergeDelayError {      public static Observable<String> getUserIds() {         return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});     }      public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {         Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {              public Subscription onSubscribe(Observer<? super String> t1) {                 if (integer.contains(errorNumber)) {                     t1.onError(new Exception());                 } else {                     t1.onNext(prefix + integer);                     t1.onCompleted();                 }                 return Subscriptions.empty();             }         });         return observable;     }      public static void main(String[] args) {          Observable<String> userIdObservable = getUserIds();         Observable<String> t = userIdObservable.flatMap(new Func1<String, Observable<String>>() {              public Observable<String> call(final String t1) {                 Observable<String> info1 = getInfo("1::: ", t1, "2");                 Observable<String> info2 = getInfo("2::: ",t1, "3");                 return Observable.mergeDelayError(info1, info2);             }         });          t.subscribe(new Action1<String>() {              public void call(String t1) {                 System.out.println(t1);             }         }, new Action1<Throwable>() {              public void call(Throwable t1) {                 t1.printStackTrace();             }         },         new Action0(){              public void call() {                 System.out.println("onComplete");             }          });     } } 

Output:

1::: 1 2::: 1 2::: 2 java.lang.Exception         at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:32)         at rx.Observable.subscribe(Observable.java:241)         at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:266)         at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:210)         at rx.operators.OperationMergeDelayError$2.onSubscribe(OperationMergeDelayError.java:77)         at rx.Observable.subscribe(Observable.java:241)         at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable.onSubscribe(OperationMergeDelayError.java:171)         at rx.operators.OperationMergeDelayError$1.onSubscribe(OperationMergeDelayError.java:64)         at rx.Observable.subscribe(Observable.java:241)         at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)         at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)         at rx.operators.OperationMap$MapObservable$1.onNext(OperationMap.java:105)         at rx.operators.SafeObserver.onNext(SafeObserver.java:102)         at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)         at rx.Observable.subscribe(Observable.java:241)         at rx.operators.OperationMap$MapObservable.onSubscribe(OperationMap.java:102)         at rx.operators.OperationMap$2.onSubscribe(OperationMap.java:76)         at rx.Observable.subscribe(Observable.java:241)         at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)         at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)         at rx.Observable.subscribe(Observable.java:241)         at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)         at rx.Observable.subscribe(Observable.java:483) 

Nested Subscribe Solution:

import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func1;  public class TestMergeDelayError {      public static Observable<String> getUserIds() {         return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});     }      public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {         Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {              public Subscription onSubscribe(Observer<? super String> t1) {                 if (integer.contains(errorNumber)) {                     t1.onError(new Exception());                 } else {                     t1.onNext(prefix + integer);                     t1.onCompleted();                 }                 return Subscriptions.empty();             }         });         return observable;     }      public static void main(String[] args) {          Observable<String> userIdObservable = getUserIds();         userIdObservable.subscribe(new Action1<String>() {              public void call(String t1) {                 Observable<String> info1 = getInfo("1::: ", t1, "2");                 Observable<String> info2 = getInfo("2::: ", t1, "3");                 Observable.merge(info1, info2).subscribe(new Action1<String>() {                      public void call(String t1) {                         System.out.println(t1);                     }                 }, new Action1<Throwable>() {                      public void call(Throwable t1) {                         t1.printStackTrace();                     }                 },                         new Action0() {                              public void call() {                                 System.out.println("onComplete");                             }                          });             }         });     } } 

Output:

1::: 1 2::: 1 onComplete java.lang.Exception         at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)         at rx.Observable.subscribe(Observable.java:241)         at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)         at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)         at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)         at rx.Observable.subscribe(Observable.java:241)         at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)         at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)         at rx.Observable.subscribe(Observable.java:241)         at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)         at rx.Observable.subscribe(Observable.java:483)         at TestMergeDelayError$2.call(TestMergeDelayError.java:47)         at TestMergeDelayError$2.call(TestMergeDelayError.java:42)         at rx.Observable$2.onNext(Observable.java:381)         at rx.operators.SafeObserver.onNext(SafeObserver.java:102)         at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)         at rx.Observable.subscribe(Observable.java:241)         at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)         at rx.Observable.subscribe(Observable.java:367)         at TestMergeDelayError.main(TestMergeDelayError.java:42) 1::: 3 java.lang.Exception         at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)         at rx.Observable.subscribe(Observable.java:241)         at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)         at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)         at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)         at rx.Observable.subscribe(Observable.java:241)         at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)         at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)         at rx.Observable.subscribe(Observable.java:241)         at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)         at rx.Observable.subscribe(Observable.java:483)         at TestMergeDelayError$2.call(TestMergeDelayError.java:47)         at TestMergeDelayError$2.call(TestMergeDelayError.java:42)         at rx.Observable$2.onNext(Observable.java:381)         at rx.operators.SafeObserver.onNext(SafeObserver.java:102)         at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)         at rx.Observable.subscribe(Observable.java:241)         at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)         at rx.Observable.subscribe(Observable.java:367)         at TestMergeDelayError.main(TestMergeDelayError.java:42) 1::: 4 2::: 4 onComplete 1::: 5 2::: 5 onComplete 1::: 6 2::: 6 onComplete 

As you can see only the individual userids that failed stopped their individual processing but the rest of the userids were processed.

Just looking for advice, see if this solution makes sense and if not what the best practice is.

Thanks, Alex

like image 389
Alex Beggs Avatar asked Mar 12 '14 02:03

Alex Beggs


People also ask

What is proper error handling?

Error handling refers to the response and recovery procedures from error conditions present in a software application. In other words, it is the process comprised of anticipation, detection and resolution of application errors, programming errors or communication errors.

Why do we need to handle exceptions?

Why do we need to handle exceptions? Explanation: The exceptions should be handled to prevent any abnormal termination of a program. The program should keep running even if it gets interrupted in between.


2 Answers

Since you want to ignore the error, you can try onErrorResumeNext(Observable.<String>empty());. For example,

Observable<String> info1 = getInfo("1::: ", t1, "2").onErrorResumeNext(Observable.<String>empty()); Observable<String> info2 = getInfo("2::: ", t1, "3").onErrorResumeNext(Observable.<String>empty()); return Observable.merge(info1, info2); 
like image 182
zsxwing Avatar answered Oct 22 '22 11:10

zsxwing


The best practice is to use mergeDelayError( ) that combine multiple Observables into one, allowing error-free Observables to continue before propagating errors.

mergeDelayError behaves much like merge. The exception is when one of the Observables being merged terminates with an onError notification. If this happens with merge, the merged Observable will immediately issue an onError notification and terminate. mergeDelayError, on the other hand, will hold off on reporting the error until it has given any other non-error-producing Observables that it is merging a chance to finish emitting their items, and it will emit those itself, and will only terminate with an onError notification when all of the other merged Observables have finished.

like image 30
Morteza Rastgoo Avatar answered Oct 22 '22 10:10

Morteza Rastgoo