Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to ignore error and continue infinite stream?

I would like to know how to ignore exceptions and continue infinite stream (in my case stream of locations)?

I'm fetching current user position (using Android-ReactiveLocation) and then sending them to my API (using Retrofit).

In my case, when exception occurs during network call (e.g. timeout) onError method is invoked and stream stops itself. How to avoid it?

Activity:

private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);
...
private void start() {
    mRestService = ...;
    ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
    mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
            .buffer(50)
            .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}

RestService:

public interface RestService {
    @POST("/.../")
    Observable<Response> postLocations(@Body List<Location> locations);
}
like image 307
Ziem Avatar asked Mar 10 '15 17:03

Ziem


3 Answers

You may want to use one of the error handling operators.

  • onErrorResumeNext( ) — instructs an Observable to emit a sequence of items if it encounters an error
  • onErrorReturn( ) — instructs an Observable to emit a particular item when it encounters an error
  • onExceptionResumeNext( ) — instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)
  • retry( ) — if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error
  • retryWhen( ) — if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source

Especialy retry and onExceptionResumeNext look promising in your case.

like image 111
tomrozb Avatar answered Nov 02 '22 07:11

tomrozb


mRestService.postLocations(locations) emit one item, then complete. If an error occur, then it emit the error, which complete the stream.

As you call this method in a flatMap, the error continue to your "main" stream, and then your stream stops.

What you can do is to transform your error into another item (as described here : https://stackoverflow.com/a/28971140/476690 ), but not on your main stream (as I presume you already tried) but on the mRestService.postLocations(locations).

This way, this call will emit an error, that will be transformed to an item/another observable and then complete. (without calling onError).

On a consumer view, mRestService.postLocations(locations) will emit one item, then complete, like if everything succeed.

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
        .buffer(50)
        .flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can't throw exception
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe();
like image 28
dwursteisen Avatar answered Nov 02 '22 08:11

dwursteisen


If you just want to ignore the error inside the flatMap without returning an element do this:

flatMap(item -> 
    restService.getSomething(item).onErrorResumeNext(Observable.empty())
);
like image 20
Albert Vila Calvo Avatar answered Nov 02 '22 07:11

Albert Vila Calvo