I new to learning the Stream API in Java 8 and functional programming in general, but not new to Java. I am interested in knowing and understanding how the Stream API selects an execution plan.
How does it know which parts to parallelize and which parts not to? How many types of execution plans even exist?
Basically, I want to know why Streams in Java 8 help in making things faster and also how it does some of this "magic".
I couldn't find much literature about how it all works.
Streams are pull-based. Only a terminal operations (like the collect ) will cause items to be consumed. Conceptually this means that collect will ask an item from the limit , limit from the map and map from the filter , and filter from the stream. And this conforms to your first printout.
Introduced in Java 8, the Stream API is used to process collections of objects. A stream is a sequence of objects that supports various methods which can be pipelined to produce the desired result. A stream is not a data structure instead it takes input from the Collections, Arrays or I/O channels.
So how does it work internally? It's actually pretty simple. Java uses trySplit method to try splitting the collection in chunks that could be processed by different threads. In terms of the execution plan, it works very similarly, with one main difference.
The filter() method allows us to pick a stream of elements that satisfy a predicate.
This question is a bit broad to explain in detail but I will try my best to answer it to satisfaction. Also I use the example of a Stream of an ArrayList.
When we are creating a stream the returned object is called ReferencePipeline
. This object is the "default stream" object so to say since it does not feature any functionality yet. Now we have to decide between lazy and eager methods. So let's take a look at one example each.
Example one: The filter(Predicate<?>)
method:
The filter()
method is declared as follows:
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
As you can see it returns a StatelessOp
object which is basically a new ReferencePipeline where filter evaluation is now 'enabled'. In other words: Every time we add a new 'functionality' to the stream it creates a new Pipeline based on the old one and with proper operation flags / method overrides.
As you maybe already know streams are not evaluated until an eager operation is called. So we need an eager method to evaluate the stream.
Exmaple two: The forEach(Consumer<?>)
method:
@Override
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
At first this is rather short and the evaluate()
method does nothing more that calling the invoke()
method. Here it is important to understand what ForEachOps.makeRef()
does. It sets the last flags that are necessary an creates an ForEachTask<>
which is working exactly the same as a ForkJoinTask
object. And happily Andrew found a nice paper on how they work.
Note: The exact sourcecode can be found here.
As you might already know, Stream API uses a Spliterator
and ForkJoinPool
to perform parallel computations. A Spliterator
is used for traversing and partitioning sequences of elements, while a ForkJoinPool
framework recursively breaks the task into smaller independent sub-tasks until they are simple enough to be executed asynchronously.
As an example of how a parallel computation framework, such as the java.util.stream
package, would use Spliterator
and ForkJoinPool
in a parallel computation, here is one way to implement an associated parallel forEach
, that illustrates the primary idiom:
public static void main(String[] args) {
List<Integer> list = new SplittableRandom()
.ints(24, 0, 100)
.boxed().collect(Collectors.toList());
parallelEach(list, System.out::println);
}
static <T> void parallelEach(Collection<T> c, Consumer<T> action) {
Spliterator<T> s = c.spliterator();
long batchSize = s.estimateSize() / (ForkJoinPool.getCommonPoolParallelism() * 8);
new ParallelEach(null, s, action, batchSize).invoke(); // invoke the task
}
The Fork Join Task:
static class ParallelEach<T> extends CountedCompleter<Void> {
final Spliterator<T> spliterator;
final Consumer<T> action;
final long batchSize;
ParallelEach(ParallelEach<T> parent, Spliterator<T> spliterator,
Consumer<T> action, long batchSize) {
super(parent);
this.spliterator = spliterator;
this.action = action;
this.batchSize = batchSize;
}
// The main computation performed by this task
@Override
public void compute() {
Spliterator<T> sub;
while (spliterator.estimateSize() > batchSize &&
(sub = spliterator.trySplit()) != null) {
addToPendingCount(1);
new ParallelEach<>(this, sub, action, batchSize).fork();
}
spliterator.forEachRemaining(action);
propagateCompletion();
}
}
Original source.
Also, keep in mind that parallel computation may not always be faster than sequential one and you always have a choice - When to use parallel stream.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With