I have the following sample code:
System.out.println( "Result: " + Stream.of(1, 2, 3) .filter(i -> { System.out.println(i); return true; }) .findFirst() .get() ); System.out.println("-----------"); System.out.println( "Result: " + Stream.of(1, 2, 3) .flatMap(i -> Stream.of(i - 1, i, i + 1)) .flatMap(i -> Stream.of(i - 1, i, i + 1)) .filter(i -> { System.out.println(i); return true; }) .findFirst() .get() );
The output is as follows:
1 Result: 1 ----------- -1 0 1 0 1 2 1 2 3 Result: -1
From here I see that in first case stream
really behaves lazily - we use findFirst()
so once we have first element our filtering lambda is not invoked. However, in second case which uses flatMap
s we see that despite first element which fulfils the filter condition is found (it's just any first element as lambda always returns true) further contents of the stream are still being fed through filtering function.
I am trying to understand why it behaves like this rather than giving up after first element is calculated as in the first case. Any helpful information would be appreciated.
Answer: Based on the comments and the answers below, flatmap is partially lazy. i.e reads the first stream fully and only when required, it goes for next. Reading a stream is eager but reading multiple streams is lazy. If this behavior is intended, the API should let the function return an Iterable instead of a stream.
Streams are lazy because intermediate operations are not evaluated unless terminal operation is invoked. Each intermediate operation creates a new stream, stores the provided operation/function and return the new stream. The pipeline accumulates these newly created streams.
The difference is that the map operation produces one output value for each input value, whereas the flatMap operation produces an arbitrary number (zero or more) values for each input value.
TL;DR, this has been addressed in JDK-8075939 and fixed in Java 10 (and backported to Java 8 in JDK-8225328).
When looking into the implementation (ReferencePipeline.java
) we see the method [link]
@Override final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) { do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink)); }
which will be invoke for findFirst
operation. The special thing to take care about is the sink.cancellationRequested()
which allows to end the loop on the first match. Compare to [link]
@Override public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) { Objects.requireNonNull(mapper); // We can do better than this, by polling cancellationRequested when stream is infinite return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { try (Stream<? extends R> result = mapper.apply(u)) { // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it if (result != null) result.sequential().forEach(downstream); } } }; } }; }
The method for advancing one item ends up calling forEach
on the sub-stream without any possibility for earlier termination and the comment at the beginning of the flatMap
method even tells about this absent feature.
Since this is more than just an optimization thing as it implies that the code simply breaks when the sub-stream is infinite, I hope that the developers soon prove that they “can do better than this”…
To illustrate the implications, while Stream.iterate(0, i->i+1).findFirst()
works as expected, Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst()
will end up in an infinite loop.
Regarding the specification, most of it can be found in the
chapter “Stream operations and pipelines” of the package specification:
…
Intermediate operations return a new stream. They are always lazy;
…
… Laziness also allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source. (This behavior becomes even more important when the input stream is infinite and not merely large.)
…
Further, some operations are deemed short-circuiting operations. An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result. A terminal operation is short-circuiting if, when presented with infinite input, it may terminate in finite time. Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.
It’s clear that a short-circuiting operation doesn’t guaranty a finite time termination, e.g. when a filter doesn’t match any item the processing can’t complete, but an implementation which doesn’t support any termination in finite time by simply ignoring the short-circuiting nature of an operation is far off the specification.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With