I'm using RxJava to calculate the normalized auto correlation over some sensor data in Android. Strangely enough, my code throws an exception ("java.lang.IllegalStateException: Only one subscriber allowed!") and I'm unsure what to make of it: I know that GroupedObservables might throw this exception when subscribed upon my multiple subscribers, but I don't think I'm using such a thing anywhere.
Below you find the method that (most likely) triggers the exception:
public Observable<Float> normalizedAutoCorrelation(Observable<Float> observable, final int lag) {
Observable<Float> laggedObservable = observable.skip(lag);
Observable<Float> meanObservable = mean(observable, lag);
Observable<Float> laggedMeanObservable = mean(laggedObservable, lag);
Observable<Float> standardDeviationObservable = standardDeviation(observable, meanObservable, lag);
Observable<Float> laggedStandardDeviationObservable = standardDeviation(laggedObservable, laggedMeanObservable, lag);
Observable<Float> deviation = observable.zipWith(meanObservable, new Func2<Float, Float, Float>() {
@Override
public Float call(Float value, Float mean) {
return value - mean;
}
});
Observable<Float> laggedDeviation = observable.zipWith(laggedMeanObservable, new Func2<Float, Float, Float>() {
@Override
public Float call(Float value, Float mean) {
return value - mean;
}
});
Observable<Float> autoCorrelationPartObservable = deviation.zipWith(laggedDeviation, new Func2<Float, Float, Float>() {
@Override
public Float call(Float value, Float laggedValue) {
return value * laggedValue;
}
});
Observable<Float> autoCorrelationObservable = flatten(autoCorrelationPartObservable.window(lag, 1).scan(new Func2<Observable<Float>, Observable<Float>, Observable<Float>>() {
@Override
public Observable<Float> call(Observable<Float> memoObservable, Observable<Float> observable) {
if(memoObservable == null) return observable;
return memoObservable.zipWith(observable, new Func2<Float, Float, Float>() {
@Override
public Float call(Float memo, Float value) {
return memo + value;
}
});
}
}));
Observable<Float> normalizationObservable = standardDeviationObservable.zipWith(laggedStandardDeviationObservable, new Func2<Float, Float, Float>() {
@Override
public Float call(Float standardDeviation, Float laggedStandardDeviation) {
return lag * standardDeviation * laggedStandardDeviation;
}
});
return autoCorrelationObservable.zipWith(normalizationObservable, new Func2<Float, Float, Float>() {
@Override
public Float call(Float autoCorrelation, Float normalization) {
return autoCorrelation / normalization;
}
});
}
And this is the stacktrace I get:
java.lang.IllegalStateException: Only one subscriber allowed!
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:124)
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:81)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OperatorZip$Zip.start(OperatorZip.java:210)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:154)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:120)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:30)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:188)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:158)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:93)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:110)
at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:173)
at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326)
at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:98)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
at rx.internal.operators.BufferUntilSubscriber.onCompleted(BufferUntilSubscriber.java:161)
at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:183)
at rx.internal.operators.OperatorSkip$1.onNext(OperatorSkip.java:58)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:121)
at com.github.joopaue.smartphonesensing.SensorService$3.onSensorChanged(SensorService.java:102)
at android.hardware.SystemSensorManager$SensorEventQueue.dispatchSensorEvent(SystemSensorManager.java:418)
at android.os.MessageQueue.nativePollOnce(Native Method)
at android.os.MessageQueue.next(MessageQueue.java:138)
at android.os.Looper.loop(Looper.java:123)
at android.app.ActivityThread.main(ActivityThread.java:5146)
at java.lang.reflect.Method.invokeNative(Native Method)
at java.lang.reflect.Method.invoke(Method.java:515)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:732)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:566)
at dalvik.system.NativeStart.main(Native Method)
I don't think I'm doing anything strange here: some zips, reduces, scans and flatmaps.
Am I missing something completely obvious, is there some hidden rule that I'm breaking here or is it a bug in RxJava? Thanks!
PS. If some code is lacking for you to be able to draw your conclusions, just ask and I'll post!
In RxJava, the operators groupBy
and window
return an observable which can be subscribed to only once and if subscribed, they replay their accumulated contents to the sole subscriber and switch to 'hot' mode.
This was a tradeoff between returning a fully hot observable and risk missing values or return an unbounded replaying observable that allows any subscribers but retains the accumulated contents indefinitely.
The middle ground, namely a single subscriber, cold-then-hot observable is thought to be the least surprising behavior and gives the developer the option to apply further operators and pick any point between the two extremes:
source.window(1, TimeUnit.SECONDS)
.map(w -> w.publish())
.doOnNext(w -> w.connect())
.subscribe(...)
source.window(1, TimeUnit.SECONDS)
.map(w -> w.cache())
.subscribe(...)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With