Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava: calling onError without finishing / unsubscribing

I have the following code(*) that implements polling using a scheduler that recursively calls the supplied observable.

(*) inspired from https://github.com/ReactiveX/RxJava/issues/448

This is working correctly when I only pass the onNext event to the subscriber. But when I pass the onError event to the subscriber, the unsubscribe event is called and this in turn kills the scheduler.

I'd like to also pass the errors to the subscriber. Any ideas how to achieve that?

public Observable<Status> observe() {
    return Observable.create(new PollingSubscriberAction<>(service.getStatusObservable(), 5, TimeUnit.SECONDS));
}

private class PollingSubscriberAction<T> implements Observable.OnSubscribe<T> {
    private Subscription subscription;
    private Subscription innerSubscription;
    private Scheduler.Worker worker = Schedulers.newThread().createWorker();

    private Observable<T> observable;
    private long delayTime;
    private TimeUnit unit;

    public PollingSubscriberAction(final Observable<T> observable, long delayTime, TimeUnit unit) {
        this.observable = observable;
        this.delayTime = delayTime;
        this.unit = unit;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        subscription = worker.schedule(new Action0() {
            @Override
            public void call() {
                schedule(subscriber, true);
            }
        });

        subscriber.add(Subscriptions.create(new Action0() {
            @Override
            public void call() {
                subscription.unsubscribe();
                if (innerSubscription != null) {
                    innerSubscription.unsubscribe();
                }
            }
        }));
    }

    private void schedule(final Subscriber<? super T> subscriber, boolean immediately) {
        long delayTime = immediately ? 0 : this.delayTime;
        subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit);
    }

    private Action0 createInnerAction(final Subscriber<? super T> subscriber) {
        return new Action0() {
            @Override
            public void call() {
                innerSubscription = observable.subscribe(new Observer<T>() {
                    @Override
                    public void onCompleted() {
                        schedule(subscriber, false);
                    }

                    @Override
                    public void onError(Throwable e) {
                        // Doesn't work.
                        // subscriber.onError(e);
                        schedule(subscriber, false);
                    }

                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                });
            }
        };
    }
}
like image 853
Jaap van Hengstum Avatar asked Feb 06 '15 13:02

Jaap van Hengstum


2 Answers

Both onError and onCompleted are terminating events, what means that your Observable won't emit any new events after any of them occurrs. In order to swallow/handle error case see error operators - https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators. Also, in order to implement polling you might take advantage of this one - http://reactivex.io/documentation/operators/interval.html

like image 125
krp Avatar answered Jan 19 '23 01:01

krp


So I've been playing with this one for some time, and I don't think it's possible in the way you're doing it. Calling onError or onCompleted terminate the stream, flipping the done flag within the SafeSubscriber wrapper, and there just isn't a way to reset it.

I can see 2 options available - neither I think are particularly elegant, but will work.

1 - UnsafeSubscribe. Possibly not the best idea but it works, because instead of wrapping your Subscriber in a SafeSubscriber, it calls it directly. Best read the Javadoc to see if this is OK for you. Or, if you're feeling adventurous write your own SafeSubscriber where you can reset the done flag or similar. With your example, call like:

observe.unsafeSubscribe(...)

2 - Implement something similar to this example. I appreciate it's in C#, but it should be readable. Simply put - you want to create a Pair<T, Exception> class, and then rather than calling onError, call onNext and set the exception side of your pair. Your subscriber will have to be a little more clever to check for each side of the pair, and you might need to do some data transformation between your source Observable and the Observable<Pair<T, Exception>>, but I can't see why it won't work.

I'd be really interested in seeing another way of doing this if anyone has any.

Hope this helps,

Will

like image 44
Will Avatar answered Jan 19 '23 02:01

Will