I am trying to emit events periodically (every 150ms) even though the upstream observable will send events faster.
But I am getting MissingBackpressureException
even though I have called onBackpressureBlock()
code:
SerializedSubject<QuotationMarker, QuotationMarker> subject = new SerializedSubject<> (PublishSubject.create());
return subject
.subscribeOn(Schedulers.computation())
.doOnSubscribe(() -> {
NetworkRequestsManager.instance().queryQuotations(productId).subscribe(quotation -> {
Log.d(TAG, "new quotation " + quotation.hashCode());
NetworkRequestsManager.instance().getSeller(quotation.sellerId)
.subscribe(seller -> {
for (Outlet outlet : seller.outlets) {
if (outlet.latitude != null && outlet.longitude != null)
subject.onNext(new QuotationMarker(outlet, quotation.price));
}
},
error -> Log.fatalError(new RuntimeException(error)));
},
error -> Log.fatalError(new RuntimeException(error)));
})
.doOnError(throwable -> Log.fatalError(new RuntimeException(
"error response in subscribe after doOnSubscribe",
throwable)))
// combine with another observable that emits items regularly (every 100ms)
// so that a new event is received every 100ms :
// also, first event itself is delayed.
.zipWith(Observable.interval(150, TimeUnit.MILLISECONDS),
(seller, aLong) -> seller)
.onBackpressureBlock() // prevent zipWith Observer.interval from throwing MissingBackpressureException s
.doOnError(throwable -> Log.fatalError(new RuntimeException(
"error response after onBackpressureBlock()",
throwable))); // <-- error is thrown here
trace:
05-06 00:38:25.532 28106-28166/com.instano.buyer W/System.err﹕ java.lang.RuntimeException: error response after onBackpressureBlock()
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.lambda$fetchQuotationMarkersForProduct$59(Quotations.java:67)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.access$lambda$5(Quotations.java)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations$$Lambda$8.call(Unknown Source)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.Observable$11.onError(Observable.java:4193)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:65)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.complete(OperatorOnBackpressureBlock.java:81)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.drain(BackpressureDrainManager.java:190)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.terminateAndDrain(BackpressureDrainManager.java:129)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.onError(OperatorOnBackpressureBlock.java:68)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onError(OperatorZip.java:324)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:332)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.Scheduler$Worker$1.call(Scheduler.java:120)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:390)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.FutureTask.run(FutureTask.java:234)
05-06 00:38:25.582 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)
05-06 00:38:25.592 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.lang.Thread.run(Thread.java:841)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ Caused by: rx.exceptions.MissingBackpressureException
05-06 00:38:25.612 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:349)
05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:330)
05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.err﹕ ... 10 more
PS: Log.fatalError(err)
is just my wrapper around Android.util.Log.e(...)
EDIT
After a lot of trial and error, this is becoming a wont fix
for me. zipWith(Observable.interval...)
seems like the culprit and a possible framework bug. Removing those lines (and hence my periodic emission feature) my code works.
I am using a subject that probably calls onNext
from different threads and then I am performing Obeservable operators on it.
I think (but I'm not sure) that the problem is that your backpressure configuration is after the zip
operator.
The zip
operator need to buffer items of one Observable
to zip it with another Observable
. This is this buffer that should throw your exception. (see here)
To solve your issue, I think you should try to add backpressure configuration on one (or on each) Observable
used in the zip
operator.
example :
obs.zipWith(Observable.interval(150, TimeUnit.MILLISECONDS).onBackPressureDrop());
obs.onBackPressureBlock().zipWith(Observable.interval(150, TimeUnit.MILLISECONDS));
The answer above from @dwursteisen and @zsxwing is correct.
The interval operator is one that emits based on time and thus is "hot" and does not support backpressure. Thus, it will keep emitting and fill the internal bounded buffer of zip which causes the MissingBackpressureException.
When dealing with a "hot" source (such as one based on time, or user events) you must choose the strategy for how to deal with overflow.
In this case, you would need to put that strategy on the interval
operator.
Here is code showing what's going on and options for dealing with it:
import java.util.concurrent.TimeUnit;
import rx.Observable;
public class ZipInterval {
public static void main(String... args) {
Observable<Long> slowHotSource = Observable.interval(1, TimeUnit.SECONDS);
/** This one is fast and hot so will cause a MissingBackpressureException.
*
* This is because a "hot" source based on time does not obey backpressure
* and keeps emitting regardless of what the downstream asks for.
*
* Examples of "hot" and "cold" and approaches to both can be found at:
* https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=90 and
* https://github.com/ReactiveX/RxJava/wiki/Backpressure
* */
// Observable<Long> fastHotSource = Observable.interval(1, TimeUnit.MILLISECONDS);
/**
* The following version of 'fastHotSource' composes a simple flow control strategy.
*/
Observable<Long> fastHotSource = Observable.interval(1, TimeUnit.MILLISECONDS).onBackpressureDrop();
Observable<String> zipped = Observable.zip(slowHotSource, fastHotSource, (s, f) -> {
return s + " " + f;
});
// subscribe to the output
System.out.println("---- zip output");
zipped.take(10).toBlocking().forEach(System.out::println);
/**
* The outcome of the above is probably not what is expected though.
*
* This is because zip will buffer the output and then `fastHotSource` will drop until
* the zip buffer asks for more.
*
* For temporal or "hot" sources like this, using `withLatestFrom` or `combineLatest`
* is often more appropriate than `zip`.
*/
Observable<String> latest = slowHotSource.withLatestFrom(fastHotSource, (s, f) -> {
return s + " " + f;
});
// subscribe to the output
System.out.println("---- latest output");
latest.take(10).toBlocking().forEach(System.out::println);
}
}
The output of this is:
---- zip output
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
---- latest output
0 1002
1 2002
2 3000
3 4001
4 5003
5 6001
6 7000
7 8002
8 9005
9 10000
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With