Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to continue stream if some observable throws exception?

I have to insert a list of documents in couchbase. I am using RxJava and asyncbucket to do so using the below mentioned code. The retryWhen function tries 3 times exponentially to save the document.

My question is : what happens if it fails? Will the other list continue to save or error will be thrown and the observable will stop? If yes, how do I make the observable continue to try saving the remaining list?

Observable
    .from(docs)
    .subscribeOn(Schedulers.io())
    .flatMap(docToInsert->asyncBucket.insert(docToInsert).retryWhen(retryFunc()))
    .toBlocking()
    .last();

public static RetryWhenFunction retryFunc() {
    return RetryBuilder.anyOf(TemporaryFailureException.class,RequestCancelledException.class,
            TimeoutException.class).delay(Delay.exponential(TimeUnit.SECONDS, 10)).max(3).build();
}
like image 262
humbleCoder Avatar asked Mar 22 '26 02:03

humbleCoder


1 Answers

You can change an error into a completion, which means a failure fetching one document will not affect others.

Change to:

...
.flatMap(docToInsert->asyncBucket.insert(docToInsert)
                        .retryWhen(retryFunc())
                        .onErrorResumeNext(Observable.empty())
...

Using onErrorResumeNext() you are announcing that this observable chain is completing normally, so it won't affect any other chain.

like image 197
Bob Dalgleish Avatar answered Mar 24 '26 23:03

Bob Dalgleish