Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stream multiple filter internals

I am trying to understand internal calls of Stream API of Java.

I have the following code, which has two filter (intermediate) operations and one terminal operation.

IntStream.of(1,2,3)
    .filter(e->e%2==0)
    .filter(e->e==2)
    .forEach(e->System.out.println(e));

Stream - > returns Stream with overridden filter - > returns Stream with overridden filter - > terminal

I see that for each intermediate operation a new stream is returned with overridden filter method. Once it hits the terminal method, the stream executes the filter. I see that filter() is being run twice if there are two filter operations instead of once.

I want to understand how one stream traversal is able to call filter twice.

Pasting the IntPipeline code below which is hit for the filter method in Stream.

@Override
public final IntStream filter(IntPredicate predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
                                    StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
            return new Sink.ChainedInt<Integer>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(int t) {
                    if (predicate.test(t)) ///line 11
                        downstream.accept(t);
                }
            };
        }
    };
}

The filter() returns a new Stream whose predicate is set as e%2==0 and then again a new Stream is returned whose predicate is e==2. Once the terminal operation is hit the, for each traversal the predicates code is executed then at line 11.

Edit : I see that downstream is used to link the intermediate ops as a LinkedList. So all the implementations are added to the linkedlist as previous stage and called once traversal starts?

like image 335
javaAndBeyond Avatar asked Oct 10 '20 10:10

javaAndBeyond


People also ask

Can we use multiple filters in stream?

The Stream API allows chaining multiple filters. We can leverage this to satisfy the complex filtering criteria described. Besides, we can use the not Predicate if we want to negate conditions.

Can we use stream API multiple times?

A stream should be operated on (invoking an intermediate or terminal stream operation) only once. A stream implementation may throw IllegalStateException if it detects that the stream is being reused. So the answer is no, streams are not meant to be reused.

Can stream pipeline have multiple terminal operations?

A stream can execute any number of intermediate operations, which is termed stream chaining. A stream can have only one terminal operation, it cannot be chained.

Are streams better than for loops?

If you have a small list, loops perform better. If you have a huge list, a parallel stream will perform better. Purely thinking in terms of performance, you shouldn't use a for-each loop with an ArrayList, as it creates an extra Iterator instance that you don't need (for LinkedList it's a different matter).


Video Answer


2 Answers

I think you are having some burden and mixed concepts in your understanding of Streams.

  1. Your confusion has nothing to do with filter (or any other operation) implementation; nor does there exist a concept of overriding an intermediate (or any other) stream operation;

    Overriding is a completely different concept and it is related to Inheritance

  2. Every single stream is a single pipeline, which has a (1) beginning, (2) optionally an intermediate part, and (3) the end/conclusion;

  3. Intermediate operations are not overridden; they, instead, form a sequential chain of operations, which every element of the stream will have to go through (unless an element is discarded at some intermediate operation), in the order of same sequence.

    Think of Stream of K objects, as a pipe, in which instances of K will go through. That pipe has a beginning/source (where objects are entering the pipe), and the end/destination; however, in between, there might be several intermediate operations which will (either) filter, transform, box, or etc. those objects. Streams are lazy, meaning, that intermediate operations are not performed until the terminal operation is invoked (this is a great feature of streams), so, each item, one at a time, will go through this pipe, when the terminal operation is invoked.

Additionally, read this snippet:

To perform a computation, stream operations are composed into a stream pipeline. A stream pipeline consists of a source (which might be an array, a collection, a generator function, an I/O channel, etc), zero or more intermediate operations (which transform a stream into another stream, such as filter(Predicate)), and a terminal operation (which produces a result or side-effect, such as count() or forEach(Consumer)). Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed.

Remember the paradigm, that Stream consists of:

  1. Sourcer operation - one that obtains stream from some set of some objects, or represents an existing stream object (transactions on the picture below);
  2. Intermediate operations - that could be 0 or more, and that are needed to introduce intermediary fine-graining, adjusting or modifying operations on your stream (filter, sorted, and map on the picture below);
  3. Terminal operation - that terminates and closes stream. It can produce (return) something, consume (accept) something, neither, or both (collect on the picture below).

enter image description here


If you are still confused (which should not be the case now), you can additionally refer to few important points that may shed some more light on your confusions:

Intermediate operations return a new stream. They are always lazy; executing an intermediate operation such as filter() does not actually perform any filtering, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate. Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed;

Processing streams lazily allows for significant efficiencies; in a pipeline such as the filter-map-sum example above, filtering, mapping, and summing can be fused into a single pass on the data, with minimal intermediate state. Laziness also allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source. (This behavior becomes even more important when the input stream is infinite and not merely large.)

like image 67
Giorgi Tsiklauri Avatar answered Sep 20 '22 07:09

Giorgi Tsiklauri


I will try to do my best to explain what is happening behind the scene in Stream API, First of all you should change your mind about how you have been programming so far, try to get this new idea.

So to have an example of real world imagine factory( I mean real factory in real world not factory design pattern), in a factory we have some raw material and some consecutive processes in different stages that turn the raw material into finished product. to have a grasp of this concept see the following diagram:

(stage1)raw material -> (stage2) process the input and pass the output to the next stage -> (stage3) process the input and pass the output to the next stage -> .....

So the output of the first stage is the raw material and all subsequent stages do some processing on their input and pass it along(for example it can turn the input of the stage to something else or rejects that input completely because of its low quality) then after it will hand over the output to another stage which is in front of it. from now on we call this consecutive stages altogether a Pipeline.

what is processing?, it can be anything, for instance one stage can decide to turn the input to completely different thing and pass it along( this is exactly what map provides us with in Stream API), another stage may allow input to pass along based on some condition(this is exactly what filter does in Stream API).

Java Stream API does something similar to a factory. Each Stream is exactly a pipeline and you can add another stage to each pipeline and create a new pipeline, so when you write IntStream.of(1,2,3) you have created a pipeline which is IntStream so let's break down your code:

IntStream intStream = IntStream.of(1,2,3)

This is the equivalent to raw material in our factory, therefore it is a pipeline which has only one stage. However having a pipeline which only passes the raw material has no benefit. Let's add another stage to the previous pipeline and create a new Pipeline:

IntStream evenNumbePipeline = intStream.filter(e -> e%2==0);

Please note that here you create new Pipeline and this pipeline is exactly the previous one plus another stage that allow only even number pass through and rejects other ones. When you call filter method following portion of the code create exactly a new Pipeline:

@Override
public final IntStream filter(IntPredicate predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
                                    StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
            return new Sink.ChainedInt<Integer>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(int t) {
                    if (predicate.test(t)) ///line 11
                        downstream.accept(t);
                }
            };
        }
    };
}

You can see filter return new instance of StatelessOp<Integer> which extends IntPipeline as below:

abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> 

Let's stop here for a moment with a question: Has any operation done so far? The answer is a BIG NO, when you create a factory or pipeline of the factory no product has been produced yet, You should provide the raw material to the pipeline to get your finished product by factory, but we haven't done it so far. so when we are calling filter and other operation in stream we are just designing our pipeline procedures and we don't really process anything, we are just adding another stage to our pipeline and say hey when you are given the input you should do this procedure on it,

In our case we add stage2 to our factory and tell it when you are given the input check if it is even or not and then after allow it to pass along if it is an even number. We are reaching to your question now let's add another stage to our pipeline:

IntStream onlyTwo = evenNumbePipeline.filter(e -> e==2);

Here you create new pipeline which get the previous pipeline (which is evenNumbePipeline) and add another stage to that pipeline(evenNumbePipeline has not changed, we create new pipeline which has evenNumbePipeline inside it). Let's take a look at our pipeline so far:

raw material(stage1) -> filter even number(stage2) -> filter only 2(stage3)

Think about it as the definition of the stages in our pipeline and not operation, maybe we don't have raw material yet but we can design our factory so that we can provide it with raw material later. You can see this pipeline has three stages and each stage does something to the previous stage output. Pipeline will be provided by raw material one by one(forget about parallel stream fro now) so when you provide 1 as raw material to this pipeline it goes through these stages. Each of this stage is new Object in java.

So lets get to something you said in your question

I want to understand how one stream traversal is able to call filter twice.

From what we have investigated so far do you think traversal of the Stream has called filter method twice or when we created our pipeline we have called filter method twice?

We call this filter method twice because we want to have two different stages in our pipeline. Think about a factory we call filter method twice because we want to have two different filter stages in our factory when we are designing it. We have not traversed the factory stages yet and we have not produced any finished product yet.

Let's have some fun and produce some output:

onlyTwo.forEach(e -> System.out.println(e));

As foreach is a terminal operation, it starts our factory and provides raw material to our factory's pipeline. So for instance 1 go through stage2 then stage3 and then delivered to foreach statement.

But there is another question: how can we define what each stage does when we are designing our pipeline?

In our example when we were designing our pipeline we called filter method to create new stage and pass to it something that filter stage should do as a parameter the name of this parameter is predicate. The operation that exactly executed when input received(by each stage) is define by opWrapSink method in each stage. Therefore we have to implement this method when we are creating our stage so let's get back to our filter method where Stream classs create new Stage for us:

@Override
public final IntStream filter(IntPredicate predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
                                    StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
            return new Sink.ChainedInt<Integer>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(int t) {
                    if (predicate.test(t)) ///line 11
                        downstream.accept(t);
                }
            };
        }
    };
}

You can see each stage's opWrapSink methods return a Sink but what's the Sink?

To put away lots of complexity in this interface it is a consumer and has an accept method as below(it has also has many other accept method for primitive types to avoid unnecessary boxing and unboxing) :

void accept(T t);

When you are implementing this interface you should define what you want to do with the input value that you will be delivered as input in the stage. You don't need to implement this interface in your program because methods in Stream implementations have done the heavy lifting for you. Let's see how it has been implemented in filter case:

@Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
    return new Sink.ChainedInt<Integer>(sink) {
        @Override
        public void begin(long size) {
            downstream.begin(-1);
        }

        @Override
        public void accept(int t) {
            if (predicate.test(t)) ///line 11
                downstream.accept(t);
        }
    };
}

Stream framework provides opWrapSink method with the next stage Sink(as the second parameter when calling this method), it means we know how the next stage in our Pipeline do their job(with the help of this Sink) but we should provide them with an input, it is crystal clear that the input to the next stage is the output of the current stage. another parameter that we need to produce the output of the current stage is the input to our current stage.

input to our current stage -> do operation of the current stage on the input -> pass the output to the next stages(following stages in the pipeline)

So in the accept method we have input to our current stage as parameter t we should do something on this input(as the operation of our current stage on the input) and then pass it to the next stage. In our filter stage we need to check if the input to our stage which is t passes a predicate(which in our case is e%2==0) and then after we should pass it to the next stage Sink. And here is exactly what our accept method do(downstream is exactly the Sink for the following stages in the pipeline):

@Override
public void accept(int t) {
    if (predicate.test(t)) ///line 11
        downstream.accept(t);
}

What you should notice in this accept method implementation is that it only passes the input of the current stage(which is t) to the next stages if it passes a predicate(which in our case is e%2==0) and if it does not passes the predicate it does not pass it through(It is exactly what we expect from the filtering stage to do);

like image 24
Tashkhisi Avatar answered Sep 18 '22 07:09

Tashkhisi