Basically this came up while trying to answer another question. Suppose this code:
AtomicInteger i = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
IntStream.generate(() -> i.incrementAndGet())
.parallel()
.peek(x -> count.incrementAndGet())
.limit(5)
.forEach(System.out::println);
System.out.println("count = " + count);
I understand the fact that IntStream#generate
is an unordered infinite stream and for it to finish there has to be a short-circuiting operation (limit
in this case). I also understand that the Supplier
is free to be called as many number of times the Stream implementation feels like before it reaches that limit.
Running this under java-8, would print count
always 512
(may be not always, but it is so on my machine).
On the contrast running this under java-10 rarely exceeds 5
. So my question is what changed internally that the short-circuiting happens so much better (I am trying to answer this on my own by having the sources and trying to do some diffs... )
The limit method of the Stream class introduced in Java 8 allows the developer to limit the number of elements that will be extracted from a stream. The limit method is useful in those applications where the user wishes to process only the initial elements that occur in the stream.
IntStream limit() method in JavaThe limit() method of the IntStream class is used to return a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length. Here, maxSize is the parameter. Here, the maxSize parameter is the count of elements the stream is limited to.
Stream operations are divided into intermediate ( Stream -producing) operations and terminal (value- or side-effect-producing) operations. Intermediate operations are always lazy.
Description. A stream can be ordered or unordered. An ordered stream keeps the order of its elements. The Streams API can convert an ordered stream, which may represent an ordered data source such as a list or a sorted set, into an unordered stream.
The change happened somewhere between Java 9, beta 103 and Java 9, beta 120 (JDK‑8154387).
The responsible class is StreamSpliterators.UnorderedSliceSpliterator.OfInt
, resp. its super class StreamSpliterators.UnorderedSliceSpliterator
.
The old version of the class looked like
abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
static final int CHUNK_SIZE = 1 << 7;
// The spliterator to slice
protected final T_SPLITR s;
protected final boolean unlimited;
private final long skipThreshold;
private final AtomicLong permits;
UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
this.s = s;
this.unlimited = limit < 0;
this.skipThreshold = limit >= 0 ? limit : 0;
this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
}
UnorderedSliceSpliterator(T_SPLITR s,
UnorderedSliceSpliterator<T, T_SPLITR> parent) {
this.s = s;
this.unlimited = parent.unlimited;
this.permits = parent.permits;
this.skipThreshold = parent.skipThreshold;
}
…
@Override
public void forEachRemaining(Consumer<? super T> action) {
Objects.requireNonNull(action);
ArrayBuffer.OfRef<T> sb = null;
PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) {
// Optimistically traverse elements up to a threshold of CHUNK_SIZE
if (sb == null)
sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
else
sb.reset();
long permitsRequested = 0;
do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
if (permitsRequested == 0)
return;
sb.forEach(action, acquirePermits(permitsRequested));
}
else {
// Must be UNLIMITED; let 'er rip
s.forEachRemaining(action);
return;
}
}
}
As we can see, it attempts to buffer up to CHUNK_SIZE = 1 << 7
elements in each spliterator, which may end up at “number of CPU cores”×128 elements.
In contrast, the new version looks like
abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
static final int CHUNK_SIZE = 1 << 7;
// The spliterator to slice
protected final T_SPLITR s;
protected final boolean unlimited;
protected final int chunkSize;
private final long skipThreshold;
private final AtomicLong permits;
UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
this.s = s;
this.unlimited = limit < 0;
this.skipThreshold = limit >= 0 ? limit : 0;
this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
((skip + limit) / AbstractTask.LEAF_TARGET) + 1) : CHUNK_SIZE;
this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
}
UnorderedSliceSpliterator(T_SPLITR s,
UnorderedSliceSpliterator<T, T_SPLITR> parent) {
this.s = s;
this.unlimited = parent.unlimited;
this.permits = parent.permits;
this.skipThreshold = parent.skipThreshold;
this.chunkSize = parent.chunkSize;
}
…
@Override
public void forEachRemaining(Consumer<? super T> action) {
Objects.requireNonNull(action);
ArrayBuffer.OfRef<T> sb = null;
PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) {
// Optimistically traverse elements up to a threshold of chunkSize
if (sb == null)
sb = new ArrayBuffer.OfRef<>(chunkSize);
else
sb.reset();
long permitsRequested = 0;
do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
if (permitsRequested == 0)
return;
sb.forEach(action, acquirePermits(permitsRequested));
}
else {
// Must be UNLIMITED; let 'er rip
s.forEachRemaining(action);
return;
}
}
}
So now there is an instance field chunkSize
. When there is a defined limit and the expression ((skip + limit) / AbstractTask.LEAF_TARGET) + 1
evaluates to a smaller value than CHUNK_SIZE
, that smaller value will be used. So when having small limits, the chunkSize
will be much smaller. In your case with a limit of 5
, the chunk size will always be 1
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With