Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava: "java.lang.IllegalStateException: Only one subscriber allowed!"

I'm using RxJava to calculate the normalized auto correlation over some sensor data in Android. Strangely enough, my code throws an exception ("java.lang.IllegalStateException: Only one subscriber allowed!") and I'm unsure what to make of it: I know that GroupedObservables might throw this exception when subscribed upon my multiple subscribers, but I don't think I'm using such a thing anywhere.

Below you find the method that (most likely) triggers the exception:

public Observable<Float> normalizedAutoCorrelation(Observable<Float> observable, final int lag) {
    Observable<Float> laggedObservable = observable.skip(lag);

    Observable<Float> meanObservable = mean(observable, lag);
    Observable<Float> laggedMeanObservable = mean(laggedObservable, lag);

    Observable<Float> standardDeviationObservable = standardDeviation(observable, meanObservable, lag);
    Observable<Float> laggedStandardDeviationObservable = standardDeviation(laggedObservable, laggedMeanObservable, lag);

    Observable<Float> deviation = observable.zipWith(meanObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float value, Float mean) {
            return value - mean;
        }
    });

    Observable<Float> laggedDeviation = observable.zipWith(laggedMeanObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float value, Float mean) {
            return value - mean;
        }
    });

    Observable<Float> autoCorrelationPartObservable = deviation.zipWith(laggedDeviation, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float value, Float laggedValue) {
            return value * laggedValue;
        }
    });

    Observable<Float> autoCorrelationObservable = flatten(autoCorrelationPartObservable.window(lag, 1).scan(new Func2<Observable<Float>, Observable<Float>, Observable<Float>>() {
        @Override
        public Observable<Float> call(Observable<Float> memoObservable, Observable<Float> observable) {
            if(memoObservable == null) return observable;

            return memoObservable.zipWith(observable, new Func2<Float, Float, Float>() {
                @Override
                public Float call(Float memo, Float value) {
                    return memo + value;
                }
            });
        }
    }));

    Observable<Float> normalizationObservable = standardDeviationObservable.zipWith(laggedStandardDeviationObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float standardDeviation, Float laggedStandardDeviation) {
            return lag * standardDeviation * laggedStandardDeviation;
        }
    });

    return autoCorrelationObservable.zipWith(normalizationObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float autoCorrelation, Float normalization) {
            return autoCorrelation / normalization;
        }
    });
}

And this is the stacktrace I get:

java.lang.IllegalStateException: Only one subscriber allowed!
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:124)
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:81)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OperatorZip$Zip.start(OperatorZip.java:210)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:154)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:120)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:30)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:188)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:158)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:93)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:110)
at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:173)
at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326)
at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
  at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
  at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:98)
  at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
  at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
  at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
  at rx.internal.operators.BufferUntilSubscriber.onCompleted(BufferUntilSubscriber.java:161)
  at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:183)
  at rx.internal.operators.OperatorSkip$1.onNext(OperatorSkip.java:58)
  at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
  at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
  at rx.subjects.PublishSubject.onNext(PublishSubject.java:121)
  at com.github.joopaue.smartphonesensing.SensorService$3.onSensorChanged(SensorService.java:102)
  at android.hardware.SystemSensorManager$SensorEventQueue.dispatchSensorEvent(SystemSensorManager.java:418)
  at android.os.MessageQueue.nativePollOnce(Native Method)
  at android.os.MessageQueue.next(MessageQueue.java:138)
  at android.os.Looper.loop(Looper.java:123)
  at android.app.ActivityThread.main(ActivityThread.java:5146)
  at java.lang.reflect.Method.invokeNative(Native Method)
  at java.lang.reflect.Method.invoke(Method.java:515)
  at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:732)
  at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:566)
  at dalvik.system.NativeStart.main(Native Method)

I don't think I'm doing anything strange here: some zips, reduces, scans and flatmaps.

Am I missing something completely obvious, is there some hidden rule that I'm breaking here or is it a bug in RxJava? Thanks!

PS. If some code is lacking for you to be able to draw your conclusions, just ask and I'll post!

like image 816
joost Avatar asked May 27 '15 19:05

joost


1 Answers

In RxJava, the operators groupBy and window return an observable which can be subscribed to only once and if subscribed, they replay their accumulated contents to the sole subscriber and switch to 'hot' mode.

This was a tradeoff between returning a fully hot observable and risk missing values or return an unbounded replaying observable that allows any subscribers but retains the accumulated contents indefinitely.

The middle ground, namely a single subscriber, cold-then-hot observable is thought to be the least surprising behavior and gives the developer the option to apply further operators and pick any point between the two extremes:

source.window(1, TimeUnit.SECONDS)
    .map(w -> w.publish())
    .doOnNext(w -> w.connect())
    .subscribe(...)

source.window(1, TimeUnit.SECONDS)
    .map(w -> w.cache())
    .subscribe(...)
like image 141
akarnokd Avatar answered Nov 14 '22 21:11

akarnokd