Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Internal changes for limit and unordered stream

Basically this came up while trying to answer another question. Suppose this code:

AtomicInteger i = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
IntStream.generate(() -> i.incrementAndGet())
        .parallel()
        .peek(x -> count.incrementAndGet())
        .limit(5)
        .forEach(System.out::println);

System.out.println("count = " + count);

I understand the fact that IntStream#generate is an unordered infinite stream and for it to finish there has to be a short-circuiting operation (limit in this case). I also understand that the Supplier is free to be called as many number of times the Stream implementation feels like before it reaches that limit.

Running this under java-8, would print count always 512 (may be not always, but it is so on my machine).

On the contrast running this under java-10 rarely exceeds 5. So my question is what changed internally that the short-circuiting happens so much better (I am trying to answer this on my own by having the sources and trying to do some diffs... )

like image 735
Eugene Avatar asked Apr 27 '18 14:04

Eugene


People also ask

Which of the below options best explain the purpose of limit method of stream?

The limit method of the Stream class introduced in Java 8 allows the developer to limit the number of elements that will be extracted from a stream. The limit method is useful in those applications where the user wishes to process only the initial elements that occur in the stream.

How to use limit in streams in Java?

IntStream limit() method in JavaThe limit() method of the IntStream class is used to return a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length. Here, maxSize is the parameter. Here, the maxSize parameter is the count of elements the stream is limited to.

What are the main types of stream operations?

Stream operations are divided into intermediate ( Stream -producing) operations and terminal (value- or side-effect-producing) operations. Intermediate operations are always lazy.

What is unordered stream in Java?

Description. A stream can be ordered or unordered. An ordered stream keeps the order of its elements. The Streams API can convert an ordered stream, which may represent an ordered data source such as a list or a sorted set, into an unordered stream.


1 Answers

The change happened somewhere between Java 9, beta 103 and Java 9, beta 120 (JDK‑8154387).

The responsible class is StreamSpliterators.UnorderedSliceSpliterator.OfInt, resp. its super class StreamSpliterators.UnorderedSliceSpliterator.

The old version of the class looked like

abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
    static final int CHUNK_SIZE = 1 << 7;

    // The spliterator to slice
    protected final T_SPLITR s;
    protected final boolean unlimited;
    private final long skipThreshold;
    private final AtomicLong permits;

    UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
        this.s = s;
        this.unlimited = limit < 0;
        this.skipThreshold = limit >= 0 ? limit : 0;
        this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
    }

    UnorderedSliceSpliterator(T_SPLITR s,
                              UnorderedSliceSpliterator<T, T_SPLITR> parent) {
        this.s = s;
        this.unlimited = parent.unlimited;
        this.permits = parent.permits;
        this.skipThreshold = parent.skipThreshold;
    }

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            Objects.requireNonNull(action);

            ArrayBuffer.OfRef<T> sb = null;
            PermitStatus permitStatus;
            while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                if (permitStatus == PermitStatus.MAYBE_MORE) {
                    // Optimistically traverse elements up to a threshold of CHUNK_SIZE
                    if (sb == null)
                        sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
                    else
                        sb.reset();
                    long permitsRequested = 0;
                    do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
                    if (permitsRequested == 0)
                        return;
                    sb.forEach(action, acquirePermits(permitsRequested));
                }
                else {
                    // Must be UNLIMITED; let 'er rip
                    s.forEachRemaining(action);
                    return;
                }
            }
        }

As we can see, it attempts to buffer up to CHUNK_SIZE = 1 << 7 elements in each spliterator, which may end up at “number of CPU cores”×128 elements.

In contrast, the new version looks like

abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
    static final int CHUNK_SIZE = 1 << 7;

    // The spliterator to slice
    protected final T_SPLITR s;
    protected final boolean unlimited;
    protected final int chunkSize;
    private final long skipThreshold;
    private final AtomicLong permits;

    UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
        this.s = s;
        this.unlimited = limit < 0;
        this.skipThreshold = limit >= 0 ? limit : 0;
        this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
            ((skip + limit) / AbstractTask.LEAF_TARGET) + 1) : CHUNK_SIZE;
        this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
    }

    UnorderedSliceSpliterator(T_SPLITR s,
                              UnorderedSliceSpliterator<T, T_SPLITR> parent) {
        this.s = s;
        this.unlimited = parent.unlimited;
        this.permits = parent.permits;
        this.skipThreshold = parent.skipThreshold;
        this.chunkSize = parent.chunkSize;
    }

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            Objects.requireNonNull(action);

            ArrayBuffer.OfRef<T> sb = null;
            PermitStatus permitStatus;
            while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                if (permitStatus == PermitStatus.MAYBE_MORE) {
                    // Optimistically traverse elements up to a threshold of chunkSize
                    if (sb == null)
                        sb = new ArrayBuffer.OfRef<>(chunkSize);
                    else
                        sb.reset();
                    long permitsRequested = 0;
                    do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
                    if (permitsRequested == 0)
                        return;
                    sb.forEach(action, acquirePermits(permitsRequested));
                }
                else {
                    // Must be UNLIMITED; let 'er rip
                    s.forEachRemaining(action);
                    return;
                }
            }
        }

So now there is an instance field chunkSize. When there is a defined limit and the expression ((skip + limit) / AbstractTask.LEAF_TARGET) + 1 evaluates to a smaller value than CHUNK_SIZE, that smaller value will be used. So when having small limits, the chunkSize will be much smaller. In your case with a limit of 5, the chunk size will always be 1.

like image 50
Holger Avatar answered Sep 22 '22 00:09

Holger