Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava flatMap and backpressure strange behavior

While writing a data synchronization job with RxJava I discovered a strange behavior that I cannot explain. I'm quite novice with RxJava and would appreciate help.

Briefely my job is quite simple I have a list of element IDs, I call a webservice to get each element by ID, do some processing and do multiple call to push data to DB. Data loading is faster than data storing so I encounted OutOfMemory errors.

My code pretty much look like "failing" test but then doning some test I realized that removing the line :

flatMap(dt -> Observable.just(dt))

Make it work. Failing test output shows clearly that unconsumed items stack up and this lead to OutOfMemory. Working test output shows that producer will always wait consumer so this never lead to OutOfMemory.

public static class DataStore {
    public Integer myVal;
    public byte[] myBigData;

    public DataStore(Integer myVal) {
        this.myVal = myVal;
        this.myBigData = new byte[1000000];
    }
}

@Test
public void working() {
    int MAX_CONCURRENT_LOAD = 1;
    int MAX_CONCURRENT_STORE = 2;

    AtomicInteger nbUnconsumed = new AtomicInteger(0);

    List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
    Observable.from(ids)
        .flatMap(this::produce, MAX_CONCURRENT_LOAD)
        .doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
        .flatMap(this::consume, MAX_CONCURRENT_STORE)
        .doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
        .toBlocking().forEach(s -> {});

    logger.info("Finished");
}

@Test
public void failing() {
    int MAX_CONCURRENT_LOAD = 1;
    int MAX_CONCURRENT_STORE = 2;

    AtomicInteger nbUnconsumed = new AtomicInteger(0);

    List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
    Observable.from(ids)
        .flatMap(this::produce, MAX_CONCURRENT_LOAD)
        .doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
        .flatMap(dt -> Observable.just(dt))
        .flatMap(this::consume, MAX_CONCURRENT_STORE)
        .doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
        .toBlocking().forEach(s -> {});

    logger.info("Finished");
}

private Observable<DataStore> produce(final int value) {
    return Observable.<DataStore>create(s -> {
        try {
            if (!s.isUnsubscribed()) {
                Thread.sleep(200); //Here I synchronous call WS to retrieve data
                s.onNext(new DataStore(value));
                s.onCompleted();
            }
        } catch (Exception e) {
            s.onError(e);
        }
    }).subscribeOn(Schedulers.io());
}

private Observable<Boolean> consume(DataStore value) {
    return Observable.<Boolean>create(s -> {
        try {
            if (!s.isUnsubscribed()) {
                Thread.sleep(1000); //Here I synchronous call DB to store data
                s.onNext(true);
                s.onCompleted();
            }
        } catch (Exception e) {
            s.onNext(false);
            s.onCompleted();
        }
    }).subscribeOn(Schedulers.io());
}

What is explaination behind this behavior? How could I solve my failing test without removing the Observable.just(dt)) which in my real case is a Observable.from(someListOfItme)

like image 578
benjamin.donze Avatar asked Feb 16 '26 07:02

benjamin.donze


1 Answers

flatMap by default merges an unlimited amount of sources and by applying that specific lambda without maxConcurrent parameter, you essentially unbounded the upstream which now can run at full speed, overwhelming the internal buffers of the other operators.

like image 79
akarnokd Avatar answered Feb 18 '26 21:02

akarnokd



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!