Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java stream operation fusion and stateful intermediate operations

I have been trying to understand and showcase how Java streams implement a type of loop fusion under the hood, so that several operations can be fused into a single pass.

This first example here:

Stream.of("The", "cat", "sat", "on", "the", "mat")
        .filter(w -> {
            System.out.println("Filtering: " + w);
            return w.length() == 3;
        })
        .map(w -> {
            System.out.println("Mapping: " + w);
            return w.toUpperCase();
        })
        .forEach(w -> System.out.println("Printing: " + w));

Has the following output (with the fusion of a single pass for each element quite clear):

Filtering: The
Mapping: The
Printing: THE
Filtering: cat
Mapping: cat
Printing: CAT
Filtering: sat
Mapping: sat
Printing: SAT
Filtering: on
Filtering: the
Mapping: the
Printing: THE
Filtering: mat
Mapping: mat
Printing: MAT

The second example is the same but I use the sorted() operation between the filter and map:

Stream.of("The", "cat", "sat", "on", "the", "mat")
        .filter(w -> {
            System.out.println("Filtering: " + w);
            return w.length() == 3;
        })
        .sorted()
        .map(w -> {
            System.out.println("Mapping: " + w);
            return w.toUpperCase();
        })
        .forEach(w -> System.out.println("Printing: " + w));

This has the following output:

Filtering: The
Filtering: cat
Filtering: sat
Filtering: on
Filtering: the
Filtering: mat
Mapping: The
Printing: THE
Mapping: cat
Printing: CAT
Mapping: mat
Printing: MAT
Mapping: sat
Printing: SAT
Mapping: the
Printing: THE

So my question is here, with the call to distinct, am I correct in thinking that because it is a "stateful" intermediate operation, it does not allow individual elements to be processed individually during a single pass (of all operations). Furthermore, because the sorted() stateful operation needs to process the entire input stream to produce a result, then the fusing technique cannot be deployed here, so that is why all the filtering occurs first, and then it fuses together the mapping and printing operations, after the sort? Please correct me if any of my assumptions are incorrect and feel free to elaborate on what I have already said.

In addition, how does it decide under the hood whether it can fuse elements together into a single pass or not, for example, when the distinct() operation exists, is there simply a flag that switches off to stop it from happening as it does when distinct() is not there?

A final query is, whilst the benefit of fusing operations into a single pass is sometimes obvious, for example, when combined with short-circuiting. What are the main benefits of fusing together operations such as a filter-map-forEach, or even a filter-map-sum?

like image 216
Tranquility Avatar asked Jan 28 '16 18:01

Tranquility


2 Answers

The stateless operations (map, filter, flatMap, peek, etc) are fully fused; we build a chain of cascading Consumer objects and pour the data in. Each element can be operated upon independent of each other, so there's never anything "stuck" in the chain. (This is what Louis means by how fusion is implemented -- we compose the stages into a big function, and feed the data to that.)

Stateful operations (distinct, sorted, limit, etc) are more complicated, and vary more in their behavior. Each stateful operation gets to choose how it wants to implement itself, so it can choose the least intrusive approach possible. For example, distinct (under some circumstances), lets elements come out as they are vetted, whereas sorted is a full barrier. (The difference is in how much laziness is possible, and how well they handle things like infinite sources with a limit operation downstream.)

It is true that stateful operations generally undermine some of the benefits of fusion, but not all of them (the operations upstream and downstream can still be fused.)

In addition to the value of short-circuiting, which you observed, additional big wins from fusion include (a) you don't have to populate intermediate result containers between stages, and (b) the data you are dealing with is always "hot" in cache.

like image 145
Brian Goetz Avatar answered Oct 25 '22 18:10

Brian Goetz


Yes, that's about right. All of this can be checked by looking at the source code.

Fusion isn't implemented the way I think you think it is, though. There's no looking at the whole pipeline and deciding how to fuse it; there's no flags or anything; it's just whether the operations are expressed as a StatefulOp object, which can run the entire stream up to that point and get all the output, or a StatelessOp which just decorates a Sink that says where the elements go. You can look at the source code for e.g. sorted and map for examples.

like image 23
Louis Wasserman Avatar answered Oct 25 '22 18:10

Louis Wasserman