I found some surprising behavior with Java parallel streams. I made my own Spliterator
, and the resulting parallel stream gets divided up until each stream has only one element in it. That seems way too small and I wonder what I'm doing wrong. I'm hoping there's some characteristics I can set to correct this.
Here's my test code. The Float
here is just a dummy payload, my real stream class is somewhat more complicated.
public static void main( String[] args ) {
TestingSpliterator splits = new TestingSpliterator( 10 );
Stream<Float> test = StreamSupport.stream( splits, true );
double total = test.mapToDouble( Float::doubleValue ).sum();
System.out.println( "Total: " + total );
}
This code will continually split this stream until each Spliterator
has only one element. That seems way too much to be efficient.
Output:
run:
Split on count: 10
Split on count: 5
Split on count: 3
Split on count: 5
Split on count: 2
Split on count: 2
Split on count: 3
Split on count: 2
Split on count: 2
Total: 5.164293184876442
BUILD SUCCESSFUL (total time: 0 seconds)
Here's the code of the Spliterator
. My main concern is what characteristics I should be using, but perhaps there's a problem somewhere else?
public class TestingSpliterator implements Spliterator<Float> {
int count;
int splits;
public TestingSpliterator( int count ) {
this.count = count;
}
@Override
public boolean tryAdvance( Consumer<? super Float> cnsmr ) {
if( count > 0 ) {
cnsmr.accept( (float)Math.random() );
count--;
return true;
} else
return false;
}
@Override
public Spliterator<Float> trySplit() {
System.err.println( "Split on count: " + count );
if( count > 1 ) {
splits++;
int half = count / 2;
TestingSpliterator newSplit = new TestingSpliterator( count - half );
count = half;
return newSplit;
} else
return null;
}
@Override
public long estimateSize() {
return count;
}
@Override
public int characteristics() {
return IMMUTABLE | SIZED;
}
}
So how can I get the stream to be split in to much larger chunks? I was hoping in the neighborhood of 10,000 to 50,000 would be better.
I know I can return null
from the trySplit()
method, but that seems like a backwards way of doing it. It seems like the system should have some notion of number of cores, current load, and how complex the code is that uses the stream, and adjust itself accordingly. In other words, I want the stream chunk size to be externally configured, not internally fixed by the stream itself.
EDIT: re. Holger's answer below, when I increase the number of elements in the original stream, the stream splits are somewhat less, so StreamSupport
does stop splitting eventually.
At an initial stream size of 100 elements, StreamSupport
stops splitting when it reaches a stream size of 2 (the last line I see on my screen is Split on count: 4
).
And for an initial stream size of 1000 elements, the final size of the individual stream chunks is about 32 elements.
Edit part deux: After looking at the output of the above, I changed my code to list out the individual Spliterator
s created. Here's the changes:
public static void main( String[] args ) {
TestingSpliterator splits = new TestingSpliterator( 100 );
Stream<Float> test = StreamSupport.stream( splits, true );
double total = test.mapToDouble( Float::doubleValue ).sum();
System.out.println( "Total Spliterators: " + testers.size() );
for( TestingSpliterator t : testers ) {
System.out.println( "Splits: " + t.splits );
}
}
And to the TestingSpliterator
's ctor:
static Queue<TestingSpliterator> testers = new ConcurrentLinkedQueue<>();
public TestingSpliterator( int count ) {
this.count = count;
testers.add( this ); // OUCH! 'this' escape
}
The result of this code is that the first Spliterator
gets split 5 times. The nextSpliterator
gets split 4 times. The next set of Spliterators
get split 3 times. Etc. The result is that 36 Spliterators
get made and the stream is split into as many parts. On typical desktop systems this seems to be the way that the API thinks is the best for parallel operations.
I'm going to accept Holger's answer below, which is essentially that the StreamSupport
class is doing the right thing, don't worry, be happy. Part of the issue for me was that I was doing my early testing on very small stream sizes and I was surprised at the number of splits. Don't make the same mistake yourself.
Parallel Streams. Any stream in Java can easily be transformed from sequential to parallel. We can achieve this by adding the parallel method to a sequential stream or by creating a stream using the parallelStream method of a collection: List<Integer> listOfNumbers = Arrays.
Like Iterator and ListIterator, Spliterator is a Java Iterator, which is used to iterate elements one-by-one from a List implemented object. Some important points about Java Spliterator are: Java Spliterator is an interface in Java Collection API.
Parallel Stream takes benefits of all available CPU cores and processes the tasks in parallel. If the number of tasks exceeds the number of cores, then remaining tasks wait for currently running task to complete.
Splitting is done by the trySplit() operation. This returns a new Spliterator. For the requirements of this function the API documentation should be referred to. When we consume the contents of [part of] the stream in bulk using the Spliterator, the forEachRemaining(action) operation is called.
Parallel stream does not change the functionality’s actual behaviour, but it can provide the output based on the applied filter (pipeline). Parallel stream is part of Java functional programming that comes into existence after the Java 8 th version.
Java Spliterator interface is an internal iterator that breaks the stream into the smaller parts. These smaller parts can be processed in parallel. In real life programming, we may never need to use Spliterator directly. Under normal operations, it will behave exactly same as Java Iterator.
Spliterator trySplit () : if the spliterator can be partitioned, returns a Spliterator covering elements, that will, upon return from this method, not be covered by this Spliterator. 3. Java Spliterator Example 3.1. Spliterator characteristics () example
You are looking on it from the wrong angle. The implementation did not split “until each spliterator has one element”, it rather split “until having ten spliterators”.
A single spliterator instance can only be processed by one thread. A spliterator is not required to support splitting after its traversal has been started. Therefore any splitting opportunity that has not been used beforehand may lead to limited parallel processing capabilities afterwards.
It’s important to keep in mind that the Stream implementation received a ToDoubleFunction
with an unknown workload¹. It doesn’t know that it is as simple as Float::doubleValue
in your case. It could be a function taking a minute to evaluate and then, having a spliterator per CPU core would be righteous right. Even having more than CPU cores is a valid strategy to handle the possibility that some evaluations take significantly longer than others.
A typical number of initial spliterators will be “number of CPU cores” × 4, though here might be more split operations later when more knowledge about actual workloads exist. When your input data has less than that number, it’s not surprising when it gets split down until one element per spliterator is left.
You may try with new TestingSpliterator( 10000 )
or 1000
or 100
to see that the number of splits will not change significantly, once the implementation assumes to have enough chunks to keep all CPU cores busy.
Since your spliterator does not know anything about the per-element workload of the consuming stream either, you shouldn’t be concerned about this. If you can smoothly support splitting down to single elements, just do that.
¹ It doesn’t have special optimizations for the case that no operations have been chained, though.
Unless I am missing the obvious, you could always pass a bufferSize
in the constructor and use that for your trySplit
:
@Override
public Spliterator<Float> trySplit() {
if( count > 1 ) {
splits++;
if(count > bufferSize) {
count = count - bufferSize;
return new TestingSpliterator( bufferSize, bufferSize);
}
}
return null;
}
And with this:
TestingSpliterator splits = new TestingSpliterator(12, 5);
Stream<Float> test = StreamSupport.stream(splits, true);
test.map(x -> new AbstractMap.SimpleEntry<>(
x.doubleValue(),
Thread.currentThread().getName()))
.collect(Collectors.groupingBy(
Map.Entry::getValue,
Collectors.mapping(
Map.Entry::getKey,
Collectors.toList())))
.forEach((x, y) -> System.out.println("Thread : " + x + " processed : " + y));
You would see that there are 3 threads. Two of them process 5
elements and one 2
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With