Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

MissingBackpressureException even after calling onBackpressureBlock()

I am trying to emit events periodically (every 150ms) even though the upstream observable will send events faster.

But I am getting MissingBackpressureException even though I have called onBackpressureBlock()

code:

    SerializedSubject<QuotationMarker, QuotationMarker> subject = new SerializedSubject<> (PublishSubject.create());

    return subject
            .subscribeOn(Schedulers.computation())
            .doOnSubscribe(() -> {
                NetworkRequestsManager.instance().queryQuotations(productId).subscribe(quotation -> {
                            Log.d(TAG, "new quotation " + quotation.hashCode());
                            NetworkRequestsManager.instance().getSeller(quotation.sellerId)
                                    .subscribe(seller -> {
                                                for (Outlet outlet : seller.outlets) {
                                                    if (outlet.latitude != null && outlet.longitude != null)
                                                        subject.onNext(new QuotationMarker(outlet, quotation.price));
                                                }
                                            },
                                            error -> Log.fatalError(new RuntimeException(error)));
                        },
                        error -> Log.fatalError(new RuntimeException(error)));

            })
            .doOnError(throwable -> Log.fatalError(new RuntimeException(
                    "error response in subscribe after doOnSubscribe",
                    throwable)))
                    // combine with another observable that emits items regularly (every 100ms)
                    // so that a new event is received every 100ms :
                    // also, first event itself is delayed.
            .zipWith(Observable.interval(150, TimeUnit.MILLISECONDS),
                    (seller, aLong) -> seller)
            .onBackpressureBlock() // prevent zipWith Observer.interval from throwing MissingBackpressureException s
            .doOnError(throwable -> Log.fatalError(new RuntimeException(
                    "error response after onBackpressureBlock()",
                    throwable))); // <-- error is thrown here

trace:

    05-06 00:38:25.532  28106-28166/com.instano.buyer W/System.err﹕ java.lang.RuntimeException: error response after onBackpressureBlock()
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.lambda$fetchQuotationMarkersForProduct$59(Quotations.java:67)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.access$lambda$5(Quotations.java)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations$$Lambda$8.call(Unknown Source)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.Observable$11.onError(Observable.java:4193)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:65)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.complete(OperatorOnBackpressureBlock.java:81)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.drain(BackpressureDrainManager.java:190)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.terminateAndDrain(BackpressureDrainManager.java:129)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.onError(OperatorOnBackpressureBlock.java:68)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onError(OperatorZip.java:324)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:332)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.Scheduler$Worker$1.call(Scheduler.java:120)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:390)
    05-06 00:38:25.572  28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.FutureTask.run(FutureTask.java:234)
    05-06 00:38:25.582  28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)
    05-06 00:38:25.592  28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
    05-06 00:38:25.602  28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)
    05-06 00:38:25.602  28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)
    05-06 00:38:25.602  28106-28166/com.instano.buyer W/System.err﹕ at java.lang.Thread.run(Thread.java:841)
    05-06 00:38:25.602  28106-28166/com.instano.buyer W/System.err﹕ Caused by: rx.exceptions.MissingBackpressureException
    05-06 00:38:25.612  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:349)
    05-06 00:38:25.642  28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:330)
    05-06 00:38:25.642  28106-28166/com.instano.buyer W/System.err﹕ ... 10 more

PS: Log.fatalError(err) is just my wrapper around Android.util.Log.e(...)

EDIT

After a lot of trial and error, this is becoming a wont fix for me. zipWith(Observable.interval...) seems like the culprit and a possible framework bug. Removing those lines (and hence my periodic emission feature) my code works. I am using a subject that probably calls onNext from different threads and then I am performing Obeservable operators on it.

like image 905
Vedant Agarwala Avatar asked Sep 29 '22 01:09

Vedant Agarwala


2 Answers

I think (but I'm not sure) that the problem is that your backpressure configuration is after the zip operator.

The zip operator need to buffer items of one Observable to zip it with another Observable. This is this buffer that should throw your exception. (see here)

To solve your issue, I think you should try to add backpressure configuration on one (or on each) Observable used in the zip operator.

example :

obs.zipWith(Observable.interval(150, TimeUnit.MILLISECONDS).onBackPressureDrop());

obs.onBackPressureBlock().zipWith(Observable.interval(150, TimeUnit.MILLISECONDS));
like image 65
dwursteisen Avatar answered Oct 13 '22 01:10

dwursteisen


The answer above from @dwursteisen and @zsxwing is correct.

The interval operator is one that emits based on time and thus is "hot" and does not support backpressure. Thus, it will keep emitting and fill the internal bounded buffer of zip which causes the MissingBackpressureException.

When dealing with a "hot" source (such as one based on time, or user events) you must choose the strategy for how to deal with overflow.

In this case, you would need to put that strategy on the interval operator.

Here is code showing what's going on and options for dealing with it:

import java.util.concurrent.TimeUnit;

import rx.Observable;


public class ZipInterval {

    public static void main(String... args) {
        Observable<Long> slowHotSource = Observable.interval(1, TimeUnit.SECONDS);

        /** This one is fast and hot so will cause a MissingBackpressureException.
         *  
         * This is because a "hot" source based on time does not obey backpressure
         * and keeps emitting regardless of what the downstream asks for.
         * 
         * Examples of "hot" and "cold" and approaches to both can be found at:
         * https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=90 and
         * https://github.com/ReactiveX/RxJava/wiki/Backpressure
         * */ 
        // Observable<Long> fastHotSource = Observable.interval(1, TimeUnit.MILLISECONDS);

        /**
         * The following version of 'fastHotSource' composes a simple flow control strategy.
         */
        Observable<Long> fastHotSource = Observable.interval(1, TimeUnit.MILLISECONDS).onBackpressureDrop();

        Observable<String> zipped = Observable.zip(slowHotSource, fastHotSource, (s, f) -> {
            return s + " " + f;
        });

        // subscribe to the output
        System.out.println("---- zip output");
        zipped.take(10).toBlocking().forEach(System.out::println);

        /**
         * The outcome of the above is probably not what is expected though. 
         * 
         * This is because zip will buffer the output and then `fastHotSource` will drop until
         * the zip buffer asks for more.
         * 
         * For temporal or "hot" sources like this, using `withLatestFrom` or `combineLatest`
         * is often more appropriate than `zip`.
         */

        Observable<String> latest = slowHotSource.withLatestFrom(fastHotSource, (s, f) -> {
            return s + " " + f;
        });

        // subscribe to the output
        System.out.println("---- latest output");
        latest.take(10).toBlocking().forEach(System.out::println);
    }
}

The output of this is:

---- zip output
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
---- latest output
0 1002
1 2002
2 3000
3 4001
4 5003
5 6001
6 7000
7 8002
8 9005
9 10000
like image 32
benjchristensen Avatar answered Oct 13 '22 01:10

benjchristensen