Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does the Java Stream API select an execution plan?

I new to learning the Stream API in Java 8 and functional programming in general, but not new to Java. I am interested in knowing and understanding how the Stream API selects an execution plan.

How does it know which parts to parallelize and which parts not to? How many types of execution plans even exist?

Basically, I want to know why Streams in Java 8 help in making things faster and also how it does some of this "magic".

I couldn't find much literature about how it all works.

like image 518
ng.newbie Avatar asked Apr 18 '18 10:04

ng.newbie


People also ask

How are Java streams executed?

Streams are pull-based. Only a terminal operations (like the collect ) will cause items to be consumed. Conceptually this means that collect will ask an item from the limit , limit from the map and map from the filter , and filter from the stream. And this conforms to your first printout.

How does Java stream API work?

Introduced in Java 8, the Stream API is used to process collections of objects. A stream is a sequence of objects that supports various methods which can be pipelined to produce the desired result. A stream is not a data structure instead it takes input from the Collections, Arrays or I/O channels.

How does Java stream work internally?

So how does it work internally? It's actually pretty simple. Java uses trySplit method to try splitting the collection in chunks that could be processed by different threads. In terms of the execution plan, it works very similarly, with one main difference.

Which method of Java stream API allows us to pick a stream of elements that satisfy a predicate?

The filter() method allows us to pick a stream of elements that satisfy a predicate.


2 Answers

This question is a bit broad to explain in detail but I will try my best to answer it to satisfaction. Also I use the example of a Stream of an ArrayList.

When we are creating a stream the returned object is called ReferencePipeline. This object is the "default stream" object so to say since it does not feature any functionality yet. Now we have to decide between lazy and eager methods. So let's take a look at one example each.

Example one: The filter(Predicate<?>) method:

The filter() method is declared as follows:

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

As you can see it returns a StatelessOp object which is basically a new ReferencePipeline where filter evaluation is now 'enabled'. In other words: Every time we add a new 'functionality' to the stream it creates a new Pipeline based on the old one and with proper operation flags / method overrides.
As you maybe already know streams are not evaluated until an eager operation is called. So we need an eager method to evaluate the stream.

Exmaple two: The forEach(Consumer<?>) method:

@Override
public void forEach(Consumer<? super P_OUT> action) {
    evaluate(ForEachOps.makeRef(action, false));
}

At first this is rather short and the evaluate() method does nothing more that calling the invoke() method. Here it is important to understand what ForEachOps.makeRef() does. It sets the last flags that are necessary an creates an ForEachTask<> which is working exactly the same as a ForkJoinTask object. And happily Andrew found a nice paper on how they work.


Note: The exact sourcecode can be found here.

like image 147
L.Spillner Avatar answered Oct 18 '22 15:10

L.Spillner


As you might already know, Stream API uses a Spliterator and ForkJoinPool to perform parallel computations.  A Spliterator is used for traversing and partitioning sequences of elements,  while a ForkJoinPool framework recursively breaks the  task  into  smaller  independent  sub-tasks  until they are simple enough to be executed asynchronously.

As an example of how a parallel computation framework, such as the  java.util.stream  package, would use Spliterator and ForkJoinPool in a parallel computation, here is one way to implement an associated parallel forEach, that illustrates the primary idiom:

public static void main(String[] args) {
    List<Integer> list = new SplittableRandom()
        .ints(24, 0, 100)
        .boxed().collect(Collectors.toList());

    parallelEach(list, System.out::println);
}

static <T> void parallelEach(Collection<T> c, Consumer<T> action) {
    Spliterator<T> s = c.spliterator();
    long batchSize = s.estimateSize() / (ForkJoinPool.getCommonPoolParallelism() * 8);
    new ParallelEach(null, s, action, batchSize).invoke(); // invoke the task
}

The Fork Join Task:

static class ParallelEach<T> extends CountedCompleter<Void> {
    final Spliterator<T> spliterator;
    final Consumer<T> action;
    final long batchSize;

    ParallelEach(ParallelEach<T> parent, Spliterator<T> spliterator,
                 Consumer<T> action, long batchSize) {
        super(parent);
        this.spliterator = spliterator;
        this.action = action;
        this.batchSize = batchSize;
    }

    // The main computation performed by this task
    @Override
    public void compute() {
        Spliterator<T> sub;
        while (spliterator.estimateSize() > batchSize &&
              (sub = spliterator.trySplit()) != null) {
            addToPendingCount(1);
            new ParallelEach<>(this, sub, action, batchSize).fork();
        }
        spliterator.forEachRemaining(action);
        propagateCompletion();
    }
}

Original source.

Also, keep in mind that parallel computation may not always be faster than sequential one and you always have a choice - When to use parallel stream.

like image 28
Oleksandr Pyrohov Avatar answered Oct 18 '22 13:10

Oleksandr Pyrohov