If I execute the following code which "concatenates" two streams
Stream<Stream<Integer>>
Stream<Stream<Integer>>
using Stream.concat()
I obtain the same correct result in both cases, but the number of filtering operations is different.
public class FlatMapVsReduce {
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
Predicate<Integer> predicate1 = i -> {
System.out.println("testing first condition with " + i);
return i == 3;
};
Predicate<Integer> predicate2 = i -> {
System.out.println("testing second condition with " + i);
return i == 7;
};
System.out.println("Testing with flatMap");
Integer result1 =
Stream.of(list.stream().filter(predicate1),
list.stream().filter(predicate2))
.flatMap(Function.identity())
.peek(i -> System.out.println("peeking " + i))
.findFirst()
.orElse(null);
System.out.println("result1 = " + result1);
System.out.println();
System.out.println("Testing with reduce");
Integer result2 =
Stream.of(list.stream().filter(predicate1),
list.stream().filter(predicate2))
.reduce(Stream::concat)
.orElseGet(Stream::empty)
.peek(i -> System.out.println("peeking " + i))
.findFirst()
.orElse(null);
System.out.println("result2 = " + result2);
}
}
I get the expected result in both cases (3). However, the first operation applies the first filter on every element of the collection, whereas the second stops as soon as one is met. The output is:
Testing with flatMap
testing first condition with 1
testing first condition with 2
testing first condition with 3
peeking 3
testing first condition with 4
testing first condition with 5
testing first condition with 6
testing first condition with 7
testing first condition with 8
testing first condition with 9
result1 = 3
Testing with reduce
testing first condition with 1
testing first condition with 2
testing first condition with 3
peeking 3
result2 = 3
Why is there a difference of behavior between the two? Could the JDK code be improved to be as efficient in the first scenario than in the second one, or is there something in flatMap that makes it impossible?
Addendum: the following alternative is as efficient as the one using reduce, but I still can't explain why:
Integer result3 = Stream.of(predicate1, predicate2)
.flatMap(c -> list.stream().filter(c).limit(1))
.peek(i -> System.out.println("peeking " + i))
.findFirst()
.orElse(null);
System.out.println("result3 = " + result3);
From the implementation of flatMap in openJDK, what I understand is that flatMap
pushes the whole content of the incoming stream downstream:
result.sequential().forEach(downstreamAsInt);
On the other hand, Stream::concat
seems to be handling the pull and not sending everything at once.
I suspect that your test does not show the full picture:
flatMap
, the second stream is only considered when the first is depleted.reduce
, all the streams are being pushed in the final concatenated stream, because the reduced object does not make sense until all the content of the input stream is consumed.Which means using one or the other depends on how complex your inputs are. If you have an infinite Stream<Stream<Integer>>
, reduce will never finish.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With