I have the following code(*) that implements polling using a scheduler that recursively calls the supplied observable.
(*) inspired from https://github.com/ReactiveX/RxJava/issues/448
This is working correctly when I only pass the onNext
event to the subscriber. But when I pass the onError
event to the subscriber, the unsubscribe event is called and this in turn kills the scheduler.
I'd like to also pass the errors to the subscriber. Any ideas how to achieve that?
public Observable<Status> observe() {
return Observable.create(new PollingSubscriberAction<>(service.getStatusObservable(), 5, TimeUnit.SECONDS));
}
private class PollingSubscriberAction<T> implements Observable.OnSubscribe<T> {
private Subscription subscription;
private Subscription innerSubscription;
private Scheduler.Worker worker = Schedulers.newThread().createWorker();
private Observable<T> observable;
private long delayTime;
private TimeUnit unit;
public PollingSubscriberAction(final Observable<T> observable, long delayTime, TimeUnit unit) {
this.observable = observable;
this.delayTime = delayTime;
this.unit = unit;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
subscription = worker.schedule(new Action0() {
@Override
public void call() {
schedule(subscriber, true);
}
});
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
subscription.unsubscribe();
if (innerSubscription != null) {
innerSubscription.unsubscribe();
}
}
}));
}
private void schedule(final Subscriber<? super T> subscriber, boolean immediately) {
long delayTime = immediately ? 0 : this.delayTime;
subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit);
}
private Action0 createInnerAction(final Subscriber<? super T> subscriber) {
return new Action0() {
@Override
public void call() {
innerSubscription = observable.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
schedule(subscriber, false);
}
@Override
public void onError(Throwable e) {
// Doesn't work.
// subscriber.onError(e);
schedule(subscriber, false);
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
});
}
};
}
}
Both onError and onCompleted are terminating events, what means that your Observable won't emit any new events after any of them occurrs. In order to swallow/handle error case see error operators - https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators. Also, in order to implement polling you might take advantage of this one - http://reactivex.io/documentation/operators/interval.html
So I've been playing with this one for some time, and I don't think it's possible in the way you're doing it. Calling onError
or onCompleted
terminate the stream, flipping the done
flag within the SafeSubscriber
wrapper, and there just isn't a way to reset it.
I can see 2 options available - neither I think are particularly elegant, but will work.
1 - UnsafeSubscribe
. Possibly not the best idea but it works, because instead of wrapping your Subscriber
in a SafeSubscriber
, it calls it directly. Best read the Javadoc to see if this is OK for you. Or, if you're feeling adventurous write your own SafeSubscriber
where you can reset the done flag or similar. With your example, call like:
observe.unsafeSubscribe(...)
2 - Implement something similar to this example. I appreciate it's in C#, but it should be readable. Simply put - you want to create a Pair<T, Exception>
class, and then rather than calling onError
, call onNext
and set the exception side of your pair. Your subscriber will have to be a little more clever to check for each side of the pair, and you might need to do some data transformation between your source Observable
and the Observable<Pair<T, Exception>>
, but I can't see why it won't work.
I'd be really interested in seeing another way of doing this if anyone has any.
Hope this helps,
Will
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With