I'm trying to create a lazy-Stream with Project Reactor in Californium-SR10.
According to the javadoc:
Transform this Flux into a lazy Iterable blocking on Iterator.next() calls.
Consequently, I have tried the following:
AtomicInteger generatedElements = new AtomicInteger(0);
Flux<Integer> source = Flux
    .range(0, 10)
    .doOnRequest(req -> System.out.println("Requested " + req))
    .doOnRequest(req -> generatedElements.addAndGet((int) req))
    .limitRate(2)
    .subscribeOn(Schedulers.elastic());
Iterator<Integer> l = source.toIterable().iterator();
assertThat(l.next()).isEqualTo(0);
assertThat(l.next()).isEqualTo(1);
assertThat(l.next()).isEqualTo(2);
Thread.sleep(2000);
assertThat(generatedElements.get()).isEqualTo(4);
Which gives me the following surprising result:
Requested 2
Requested 2
Requested 2
Requested 2
Requested 2
Requested 2
org.junit.ComparisonFailure: expected:<[4]> but was:<[12]>
Expected :4
Actual   :12
Have you any explanation on what happen here (and how to fix it)?
doOnRequest() 2?doOnRequest() will fire whenever the subscriber requests new elements from upstream (along with the number of elements requested as its parameter.) Since you've limited the rate to 2, you'd expect this to be called with 2 as a parameter every time, which it's doing.
Flux not complete lazily?Well, actually it does, but not how you expect it to. Lazy doesn't necessarily mean "one at a time", it just means it will evaluate batches as they're called for, rather than always evaluating the whole Flux in one go.
Specifically, note that the limitRate() method doesn't apply automatically to the iterator in the same way - it has a separate batch size that you have to specify as a parameter to the toIterable() method.
You can specify a prefetch rate and a batch size of 1, and then you're likely to only get 4 elements generated (rather than 3, as it'll always have at least one additional element ready for the following next() call):
AtomicInteger generatedElements = new AtomicInteger(0);
Flux<Integer> source = Flux
        .range(0, 10)
        .limitRate(1)
        .doOnRequest(req -> System.out.println("Requested " + req))
        .doOnRequest(req -> generatedElements.addAndGet((int) req));
Iterator<Integer> l = source.toIterable(1).iterator();
assertThat(l.next()).isEqualTo(0);
assertThat(l.next()).isEqualTo(1);
assertThat(l.next()).isEqualTo(2);
assertThat(generatedElements.get()).isEqualTo(4);
However, note that even this isn't guaranteed in all circumstances, as it just depends on when the backpressure is applied, and when the Flux responds to it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With