Talking about Stream
s, when I execute this piece of code
public class Main { public static void main(String[] args) { Stream.of(1,2,3,4,5,6,7,8,9) .peek(x->System.out.print("\nA"+x)) .limit(3) .peek(x->System.out.print("B"+x)) .forEach(x->System.out.print("C"+x)); } }
I get this output
A1B1C1 A2B2C2 A3B3C3
because limiting my stream to the first three components forces actions A, B and C to be executed only three times.
Trying to perform an analogous computation on the last three elements by using skip()
method, shows a different behaviour: this
public class Main { public static void main(String[] args) { Stream.of(1,2,3,4,5,6,7,8,9) .peek(x->System.out.print("\nA"+x)) .skip(6) .peek(x->System.out.print("B"+x)) .forEach(x->System.out.print("C"+x)); } }
outputs this
A1 A2 A3 A4 A5 A6 A7B7C7 A8B8C8 A9B9C9
Why, in this case, actions A1 to A6 are being executed? It must have something to do with the fact that limit is a short-circuiting stateful intermediate operation, while skip is not, but I don't understand practical implications of this property. Is it just that "every action before skip is executed while not everyone before limit is"?
Difference between limit() and skip() :The limit() method returns a reduced stream of first N elements but skip() method returns a stream of remaining elements after skipping first N elements.
The limit method of the Stream class introduced in Java 8 allows the developer to limit the number of elements that will be extracted from a stream. The limit method is useful in those applications where the user wishes to process only the initial elements that occur in the stream.
The limit() method of the IntStream class is used to return a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length. Here, maxSize is the parameter.
With Java 8, Collection interface has two methods to generate a Stream. stream() − Returns a sequential stream considering collection as its source. parallelStream() − Returns a parallel Stream considering collection as its source.
What you have here are two stream pipelines.
These stream pipelines each consist of a source, several intermediate operations, and a terminal operation.
But the intermediate operations are lazy. This means that nothing happens unless a downstream operation requires an item. When it does, then the intermediate operation does all it needs to produce the required item, and then again waits until another item is requested, and so on.
The terminal operations are usually "eager". That is, they ask for all the items in the stream that are needed for them to complete.
So you should really think of the pipeline as the forEach
asking the stream behind it for the next item, and that stream asks the stream behind it, and so on, all the way to the source.
With that in mind, let's see what we have with your first pipeline:
Stream.of(1,2,3,4,5,6,7,8,9) .peek(x->System.out.print("\nA"+x)) .limit(3) .peek(x->System.out.print("B"+x)) .forEach(x->System.out.print("C"+x));
So, the forEach
is asking for the first item. That means the "B" peek
needs an item, and asks the limit
output stream for it, which means limit
will need to ask the "A" peek
, which goes to the source. An item is given, and goes all the way up to the forEach
, and you get your first line:
A1B1C1
The forEach
asks for another item, then another. And each time, the request is propagated up the stream, and performed. But when forEach
asks for the fourth item, when the request gets to the limit
, it knows that it has already given all the items it is allowed to give.
Thus, it is not asking the "A" peek for another item. It immediately indicates that its items are exhausted, and thus, no more actions are performed and forEach
terminates.
What happens in the second pipeline?
Stream.of(1,2,3,4,5,6,7,8,9) .peek(x->System.out.print("\nA"+x)) .skip(6) .peek(x->System.out.print("B"+x)) .forEach(x->System.out.print("C"+x));
Again, forEach
is asking for the first item. This is propagated back. But when it gets to the skip
, it knows it has to ask for 6 items from its upstream before it can pass one downstream. So it makes a request upstream from the "A" peek
, consumes it without passing it downstream, makes another request, and so on. So the "A" peek gets 6 requests for an item and produces 6 prints, but these items are not passed down.
A1 A2 A3 A4 A5 A6
On the 7th request made by skip
, the item is passed down to the "B" peek and from it to the forEach
, so the full print is done:
A7B7C7
Then it's just like before. The skip
will now, whenever it gets a request, ask for an item upstream and pass it downstream, as it "knows" it has already done its skipping job. So the rest of the prints are going through the entire pipe, until the source is exhausted.
The fluent notation of the streamed pipeline is what's causing this confusion. Think about it this way:
limit(3)
All the pipelined operations are evaluated lazily, except forEach()
, which is a terminal operation, which triggers "execution of the pipeline".
When the pipeline is executed, intermediary stream definitions will not make any assumptions about what happens "before" or "after". All they're doing is take an input stream and transform it into an output stream:
Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9); Stream<Integer> s2 = s1.peek(x->System.out.print("\nA"+x)); Stream<Integer> s3 = s2.limit(3); Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x)); s4.forEach(x->System.out.print("C"+x));
s1
contains 9 different Integer
values.s2
peeks at all values that pass it and prints them.s3
passes the first 3 values to s4
and aborts the pipeline after the third value. No further values are produced by s3
. This doesn't mean that no more values are in the pipeline. s2
would still produce (and print) more values, but no one requests those values and thus execution stops. s4
again peeks at all values that pass it and prints them.forEach
consumes and prints whatever s4
passes to it.Think about it this way. The whole stream is completely lazy. Only the terminal operation actively pulls new values from the pipeline. After it has pulled 3 values from s4 <- s3 <- s2 <- s1
, s3
will no longer produce new values and it will no longer pull any values from s2 <- s1
. While s1 -> s2
would still be able to produce 4-9
, those values are just never pulled from the pipeline, and thus never printed by s2
.
skip(6)
With skip()
the same thing happens:
Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9); Stream<Integer> s2 = s1.peek(x->System.out.print("\nA"+x)); Stream<Integer> s3 = s2.skip(6); Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x)); s4.forEach(x->System.out.print("C"+x));
s1
contains 9 different Integer
values.s2
peeks at all values that pass it and prints them.s3
consumes the first 6 values, "skipping them", which means the first 6 values aren't passed to s4
, only the subsequent values are.s4
again peeks at all values that pass it and prints them.forEach
consumes and prints whatever s4
passes to it.The important thing here is that s2
is not aware of the remaining pipeline skipping any values. s2
peeks at all values independently of what happens afterwards.
Consider this pipeline, which is listed in this blog post
IntStream.iterate(0, i -> ( i + 1 ) % 2) .distinct() .limit(10) .forEach(System.out::println);
When you execute the above, the program will never halt. Why? Because:
IntStream i1 = IntStream.iterate(0, i -> ( i + 1 ) % 2); IntStream i2 = i1.distinct(); IntStream i3 = i2.limit(10); i3.forEach(System.out::println);
Which means:
i1
generates an infinite amount of alternating values: 0
, 1
, 0
, 1
, 0
, 1
, ...i2
consumes all values that have been encountered before, passing on only "new" values, i.e. there are a total of 2 values coming out of i2
.i3
passes on 10 values, then stops.This algorithm will never stop, because i3
waits for i2
to produce 8 more values after 0
and 1
, but those values never appear, while i1
never stops feeding values to i2
.
It doesn't matter that at some point in the pipeline, more than 10 values had been produced. All that matters is that i3
has never seen those 10 values.
Is it just that "every action before skip is executed while not everyone before limit is"?
Nope. All operations before either skip()
or limit()
are executed. In both of your executions, you get A1
- A3
. But limit()
may short-circuit the pipeline, aborting value consumption once the event of interest (the limit is reached) has occurred.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With