Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why the tryAdvance of stream.spliterator() may accumulate items into a buffer?

Getting a Spliterator from a Stream pipeline may return an instance of a StreamSpliterators.WrappingSpliterator. For example, getting the following Spliterator:

Spliterator<String> source = new Random()
            .ints(11, 0, 7) // size, origin, bound
            .filter(nr -> nr % 2 != 0)
            .mapToObj(Integer::toString)
            .spliterator();

Given the above Spliterator<String> source, when we traverse the elements individually through the tryAdvance (Consumer<? super P_OUT> consumer) method of Spliterator, which in this case is an instance of StreamSpliterators.WrappingSpliterator, it will first accumulate items into an internal buffer, before consuming those items, as we can see in StreamSpliterators.java#298. From a simple point of view, the doAdvance() inserts items first into buffer and then it gets the next item and pass it to consumer.accept (…).

public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
    boolean hasNext = doAdvance();
    if (hasNext)
        consumer.accept(buffer.get(nextToConsume));
    return hasNext;
}

However, I am not figuring out the need of this buffer.

In this case, why the consumer parameter of the tryAdvance is not simply used as a terminal Sink of the pipeline?

like image 789
Miguel Gamboa Avatar asked Oct 31 '17 13:10

Miguel Gamboa


People also ask

What does a Spliterator do?

Spliterators can provide an estimate of the number of remaining elements via the estimateSize() method. Ideally, as reflected in characteristic SIZED , this value corresponds exactly to the number of elements that would be encountered in a successful traversal.

What is the use of Spliterator in Java?

Like Iterator and ListIterator, Spliterator is a Java Iterator, which is used to iterate elements one-by-one from a List implemented object. Some important points about Java Spliterator are: Java Spliterator is an interface in Java Collection API.

Which of the following methods are provided by Spliterator interface trySplit?

Java Spliterator Features It provides tryAdvance() method to iterate elements individually in different threads. It helps in parallel processing. To iterate elements sequentially in a single Thread, use forEachRemaining() method. The trySplit() method is used partition the spliterator, if it is possible.

What is StreamSupport in Java?

public final class StreamSupport extends Object. Low-level utility methods for creating and manipulating streams. This class is mostly for library writers presenting stream views of data structures; most static stream methods intended for end users are in the various Stream classes.


4 Answers

Keep in mind that this is the Spliterator returned by the public method Stream.spliterator(), so no assumptions about the caller can be made (as long as it is within the contract).

The tryAdvance method may get called once for each of the stream’s elements and once more to detect the end of the stream, well, actually, it might get called an arbitrary number of times even after hitting the end. And there is no guaranty that the caller will always pass the same consumer.

To pass a consumer directly to the source spliterator without buffering, you will have to compose a consumer that will perform all pipeline stages, i.e. call a mapping function and use its result or test a predicate and not call the downstream consumer if negative and so on. The consumer passed to the source spliterator would also be responsible to notify the WrappingSpliterator somehow about a value being rejected by the filter as the source spliterator’s tryAdvance method still returns true in that case and the operation would have to be repeated then.

As Eugene correctly mentioned, this is the one-fits-all implementation that doesn’t consider how many or what kind of pipeline stages are there. The costs of composing such a consumer could be heavy and might have to be reapplied for each tryAdvance call, read for every stream element, e.g. when different consumers are passed to tryAdvance or when equality checks do not work. Keep in mind that consumers are often implemented as lambda expressions and the identity or equality of the instances produced by lambda expressions is unspecified.

So the tryAdvance implementation avoids these costs by composing only one consumer instance on the first invocation that will always store the element into the same buffer, also allocated on the first invocation, if not rejected by a filter. Note that under normal circumstances, the buffer will only hold one element. Afaik, flatMap is the only operation that may push more elements to the buffer. But note that the existence of this non-lazy behavior of flatMap is also the reason why this buffering strategy is required, at least when flatMap is involved, to ensure that the Spliterator implementation handed out by a public method will fulfill the contract of passing at most one element to the consumer during one invocation of tryAdvance.

In contrast, when you call forEachRemaining, these problems do not exist. There is only one Consumer instance during the entire operation and the non-laziness of flatMap doesn’t matter either, as all elements will get consumed anyway. Therefore, a non-buffering transfer will be attempted, as long as no previous tryAdvance call was made that could have caused buffering of some elements:

     public void forEachRemaining(Consumer<? super P_OUT> consumer) {
         if (buffer == null && !finished) {
             Objects.requireNonNull(consumer);
             init();

             ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator);
             finished = true;
         }
         else {
             do { } while (tryAdvance(consumer));
         }
     }

As you can see, as long as the buffer has not been initialized, i.e. no previous tryAdvance call was made, consumer::accept is bound as Sink and a complete direct transfer made.

like image 165
Holger Avatar answered Nov 15 '22 00:11

Holger


I mostly agree with great @Holger answer, but I would put accents differently. I think it is hard for you to understand the need for a buffer because you have very simplistic mental model of what Stream API allows. If one thinks about Stream as a sequence of map and filter, there is no need for additional buffer because those operations have 2 important "good" properties:

  1. Work on one element at a time
  2. Produce 0 or 1 element as a result

However those are not true in general case. As @Holger (and I in my original answer) mentioned there is already flatMap in Java 8 that breaks rule #2 and in Java 9 they've finally added takeWhile that actually transforms on whole Stream -> Stream rather than on a per-element basis (and that is AFAIK the first intermediate shirt-circuiting operation).

Another point I don't quite agree with @Holger is that I think that the most fundamental reason is a bit different than the one he puts in the second paragraph (i.e. a) that you may call tryAdvance post the end of the Stream many times and b) that "there is no guaranty that the caller will always pass the same consumer"). I think that the most important reason is that Spliterator being functionally identical to Stream has to support short-circuiting and laziness (i.e. ability to not process the whole Stream or else it can't support unbound streams). In other words, even if Spliterator API (quite strangely) required that you must use the same Consumer object for all calls of all methods for a given Spliterator, you would still need tryAdvance and that tryAdvance implementation would still have to use some buffer. You just can't stop processing data if all you've got is forEachRemaining(Consumer<? super T> ) so you can't implement anything similar to findFirst or takeWhile using it. Actually this is one of the reasons why inside JDK implementation uses Sink interface rather than Consumer (and what "wrap" in wrapAndCopyInto stands for): Sink has additional boolean cancellationRequested() method.

So to sum up: a buffer is required because we want Spliterator:

  1. To use simple Consumer that provides no means to report back end of processing/cancellation
  2. To provide means to stop processing of the data by a request of the (logical) consumer.

Note that those two are actually slightly contradictory requirements.

Example and some code

Here I'd like to provide some example of code that I believe is impossible to implement without additional buffer given current API contract (interfaces). This example is based on your example.

There is simple Collatz sequence of integers that is conjectured to always eventually hit 1. AFAIK this conjecture is not proved yet but is verified for many integers (at least for whole 32-bit int range).

So assume that the problem we are trying to solve is following: from a stream of Collatz sequences for random start numbers in range from 1 to 1,000,000 find the first that contains "123" in its decimal representation.

Here is a solution that uses just Stream (not a Spliterator):

static String findGoodNumber() {
    return new Random()
            .ints(1, 1_000_000) // unbound!
            .flatMap(nr -> collatzSequence(nr))
            .mapToObj(Integer::toString)
            .filter(s -> s.contains("123"))
            .findFirst().get();
}

where collatzSequence is a function that returns Stream containing the Collatz sequence until the first 1 (and for nitpickers let it also stop when current value is bigger than Integer.MAX_VALUE /3 so we don't hit overflow).

Every such Stream returned by collatzSequence is bound. Also standard Random will eventually generate every number in the provided range. It means that we are guaranteed that there eventually will be some "good" number in the stream (for example just 123) and findFirst is short-circuiting so the whole operation will actually terminate. However no reasonable Stream API implementation can predict this.

Now let's assume that for some strange reason you want to perform the same thing using intermediate Spliterator. Even though you have only one piece of logic and no need for different Consumers, you can't use forEachRemaining. So you'll have to do something like this:

static Spliterator<String> createCollatzRandomSpliterator() {
    return new Random()
            .ints(1, 1_000_000) // unbound!
            .flatMap(nr -> collatzSequence(nr))
            .mapToObj(Integer::toString)
            .spliterator();
}

static String findGoodNumberWithSpliterator() {
    Spliterator<String> source = createCollatzRandomSpliterator();

    String[] res = new String[1]; // work around for "final" closure restriction

    while (source.tryAdvance(s -> {
        if (s.contains("123")) {
            res[0] = s;
        }
    })) {
        if (res[0] != null)
            return res[0];
    }
    throw new IllegalStateException("Impossible");
}

It is also important that for some starting numbers the Collatz sequence will contain several matching numbers. For example, both 41123 and 123370 (= 41123*3+1) contain "123". It means that we really don't want our Consumer to be called post the first matching hit. But since Consumer doesn't expose any means to report end of processing, WrappingSpliterator can't just pass our Consumer to the inner Spliterator. The only solution is to accumulate all results of inner flatMap (with all the post-processing) into some buffer and then iterate over that buffer one element at a time.

like image 41
SergGr Avatar answered Nov 14 '22 23:11

SergGr


Spliterators are designed to handle sequential processing of each item in encounter order, and parallel processing of items in some order. Each method of the Spliterator must be able to support both early binding and late binding. The buffering is intended to gather data into suitable, processable chunks, that follow the requirements for ordering, parallelization and mutability.

In other words, tryAdvance() is not the only method in the class, and other methods have to work with each other to deliver the external contract. To do that, in the face of sub-classes that may override some or all of the methods, requires that each method obey its internal contract.

like image 40
Bob Dalgleish Avatar answered Nov 15 '22 00:11

Bob Dalgleish


This is something that I've read from Holger quite in a few posts and I'll just sum it up here; if there's a certain exact duplicate (I'll try to find one) - I will close and delete my answer in respect to that one.

First, is why WrappingSpliterator are needed in the first place - for stateful operations like sorted, distinct, etc - but I think you already understood that. I assume for flatMap also - since it is eager.

Now, when you call spliterator, IFF there are no stateful operations there is no real reason to wrap that into a WrappingSpliterator obviously, but at the moment this is not done. This could be changed in a future release - where they can detect if there are stateful operations before you call spliterator; but they don't do that now and simply treat every operation as stateful, thus wrapping it into WrappingSpliterator

like image 29
Eugene Avatar answered Nov 14 '22 23:11

Eugene