I am new to RxJava but I am integrating it into a project that I am working on to help me learn it. I have run into a question about best practices.
I have a question about how to handle onError
from preventing the stopping of the Observable
processing.
Here is the setup:
I have a list of userIds for each one I would like to do 2 or more network requests. If any of the network requests fails for the userid then that userid won't be updated and can be skipped. This should not prevent the other userids from being processed. I do have a solution, but it involves nested subscribes (see second block of code). One problem I do see is, if each call fails there is no way to short circuit and stop the remaining from hitting a network resource even after detecting a certain threshold number have failed.
Is there a better way to do this?
In traditional code:
List<String> results = new ArrayList<String>(); for (String userId : userIds) { try { String info = getInfo(userId); // can throw an GetInfoException String otherInfo = getOtherInfo(userId); // can throw an GetOtherInfoException results.add(info + ", " + otherInfo); } catch (GetInfoException e) { log.error(e); } catch (GetOtherInfoException e) { log.error(e); } }
PROBLEM:
Pseudocode:
userid -> network requests -> result 1 -> a, b -> onNext(1[a ,b]) 2 -> a, onError -> onError 3 -> a, b -> onNext(3[a, b]) 4 -> a, b -> onNext(4[a, b])
The following is a working example of a list of userIds and for each 2 requests for info. If you run it you will see that it will fail (see below the source code)
import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func1; public class TestMergeDelayError { public static Observable<String> getUserIds() { return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"}); } public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) { Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() { public Subscription onSubscribe(Observer<? super String> t1) { if (integer.contains(errorNumber)) { t1.onError(new Exception()); } else { t1.onNext(prefix + integer); t1.onCompleted(); } return Subscriptions.empty(); } }); return observable; } public static void main(String[] args) { Observable<String> userIdObservable = getUserIds(); Observable<String> t = userIdObservable.flatMap(new Func1<String, Observable<String>>() { public Observable<String> call(final String t1) { Observable<String> info1 = getInfo("1::: ", t1, "2"); Observable<String> info2 = getInfo("2::: ",t1, "3"); return Observable.mergeDelayError(info1, info2); } }); t.subscribe(new Action1<String>() { public void call(String t1) { System.out.println(t1); } }, new Action1<Throwable>() { public void call(Throwable t1) { t1.printStackTrace(); } }, new Action0(){ public void call() { System.out.println("onComplete"); } }); } }
Output:
1::: 1 2::: 1 2::: 2 java.lang.Exception at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:32) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:266) at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:210) at rx.operators.OperationMergeDelayError$2.onSubscribe(OperationMergeDelayError.java:77) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable.onSubscribe(OperationMergeDelayError.java:171) at rx.operators.OperationMergeDelayError$1.onSubscribe(OperationMergeDelayError.java:64) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164) at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116) at rx.operators.OperationMap$MapObservable$1.onNext(OperationMap.java:105) at rx.operators.SafeObserver.onNext(SafeObserver.java:102) at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMap$MapObservable.onSubscribe(OperationMap.java:102) at rx.operators.OperationMap$2.onSubscribe(OperationMap.java:76) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106) at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56) at rx.Observable.subscribe(Observable.java:241) at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320) at rx.Observable.subscribe(Observable.java:483)
Nested Subscribe Solution:
import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func1; public class TestMergeDelayError { public static Observable<String> getUserIds() { return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"}); } public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) { Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() { public Subscription onSubscribe(Observer<? super String> t1) { if (integer.contains(errorNumber)) { t1.onError(new Exception()); } else { t1.onNext(prefix + integer); t1.onCompleted(); } return Subscriptions.empty(); } }); return observable; } public static void main(String[] args) { Observable<String> userIdObservable = getUserIds(); userIdObservable.subscribe(new Action1<String>() { public void call(String t1) { Observable<String> info1 = getInfo("1::: ", t1, "2"); Observable<String> info2 = getInfo("2::: ", t1, "3"); Observable.merge(info1, info2).subscribe(new Action1<String>() { public void call(String t1) { System.out.println(t1); } }, new Action1<Throwable>() { public void call(Throwable t1) { t1.printStackTrace(); } }, new Action0() { public void call() { System.out.println("onComplete"); } }); } }); } }
Output:
1::: 1 2::: 1 onComplete java.lang.Exception at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164) at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116) at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106) at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56) at rx.Observable.subscribe(Observable.java:241) at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320) at rx.Observable.subscribe(Observable.java:483) at TestMergeDelayError$2.call(TestMergeDelayError.java:47) at TestMergeDelayError$2.call(TestMergeDelayError.java:42) at rx.Observable$2.onNext(Observable.java:381) at rx.operators.SafeObserver.onNext(SafeObserver.java:102) at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94) at rx.Observable.subscribe(Observable.java:241) at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320) at rx.Observable.subscribe(Observable.java:367) at TestMergeDelayError.main(TestMergeDelayError.java:42) 1::: 3 java.lang.Exception at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164) at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116) at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106) at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56) at rx.Observable.subscribe(Observable.java:241) at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320) at rx.Observable.subscribe(Observable.java:483) at TestMergeDelayError$2.call(TestMergeDelayError.java:47) at TestMergeDelayError$2.call(TestMergeDelayError.java:42) at rx.Observable$2.onNext(Observable.java:381) at rx.operators.SafeObserver.onNext(SafeObserver.java:102) at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94) at rx.Observable.subscribe(Observable.java:241) at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320) at rx.Observable.subscribe(Observable.java:367) at TestMergeDelayError.main(TestMergeDelayError.java:42) 1::: 4 2::: 4 onComplete 1::: 5 2::: 5 onComplete 1::: 6 2::: 6 onComplete
As you can see only the individual userids that failed stopped their individual processing but the rest of the userids were processed.
Just looking for advice, see if this solution makes sense and if not what the best practice is.
Thanks, Alex
Error handling refers to the response and recovery procedures from error conditions present in a software application. In other words, it is the process comprised of anticipation, detection and resolution of application errors, programming errors or communication errors.
Why do we need to handle exceptions? Explanation: The exceptions should be handled to prevent any abnormal termination of a program. The program should keep running even if it gets interrupted in between.
Since you want to ignore the error, you can try onErrorResumeNext(Observable.<String>empty());
. For example,
Observable<String> info1 = getInfo("1::: ", t1, "2").onErrorResumeNext(Observable.<String>empty()); Observable<String> info2 = getInfo("2::: ", t1, "3").onErrorResumeNext(Observable.<String>empty()); return Observable.merge(info1, info2);
The best practice is to use mergeDelayError( ) that combine multiple Observables into one, allowing error-free Observables to continue before propagating errors.
mergeDelayError
behaves much like merge
. The exception is when one of the Observables being merged terminates with an onError notification. If this happens with merge, the merged Observable will immediately issue an onError notification and terminate. mergeDelayError, on the other hand, will hold off on reporting the error until it has given any other non-error-producing Observables that it is merging a chance to finish emitting their items, and it will emit those itself, and will only terminate with an onError notification when all of the other merged Observables have finished.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With