Note: I am not necessarily looking for solutions to the concrete example problems described below. I am genuinely interested why this isn't possible out of the box in java 8.
Java streams are lazy. At the very end they have a single terminal operation.
My interpretation is that this terminal operation will pull all the values through the stream. None of the intermediate operations can do that. Why are there no intermediate operations that pull in an arbitrary amount of elements through the stream? Something like this:
stream
.mapMultiple(this::consumeMultipleElements) // or groupAndMap or combine or intermediateCollect or reverseFlatMap
.collect(Collectors.toList());
When a downstream operation tries to advance the stream once, the intermediate operation might try to advance the upstream multiple times (or not at all).
I would see a couple of use cases:
(These are just examples. So you can see that it is certainly possible to handle these use cases but it is "not the streaming way" and these solutions lack the desirable laziness property that Streams have.)
Combine multiple elements into a single new element to be passed down the rest of the stream. (E.g., making pairs (1,2,3,4,5,6) ➔ ((1,2),(3,4),(5,6)))
// Something like this,
// without needing to consume the entire stream upfront,
// and also more generic. (The combiner should decide for itself how many elements to consume/combine per resulting element. Maybe the combiner is a Consumer<Iterator<E>> or a Consumer<Supplier<E>>)
public <E, R> Stream<R> combine(Stream<E> stream, BiFunction<E, E, R> combiner) {
List<E> completeList = stream.collect(toList());
return IntStream.range(0, completeList.size() / 2)
.mapToObj(i -> combiner.apply(
completeList.get(2 * i),
completeList.get(2 * i + 1)));
}
Determine if the Stream is empty (mapping the Stream to an Optional non-empty Stream)
// Something like this, without needing to consume the entire stream
public <E> Optional<Stream<E>> toNonEmptyStream(Stream<E> stream) {
List<E> elements = stream.collect(toList());
return elements.isEmpty()
? Optional.empty()
: Optional.of(elements.stream());
}
Having a lazy Iterator that doesn't terminate the stream (allowing to skip elements based on more complicated logic then just skip(long n)).
Iterator<E> iterator = stream.iterator();
// Allow this without throwing a "java.lang.IllegalStateException: stream has already been operated upon or closed"
stream.collect(toList());
When they designed Streams and everything around them, did they forget about these use cases or did they explicitly leave this out?
I understand that these might give unexpected results when dealing with parallel streams but in my opinion this is a risk that can be documented.
Well all of the operations that you want are actually achievable in the Stream API, but not out of the box.
Combining multiple elements into pairs of elements - you need a custom Spliterator for that. Here is Tagir Valeev doing that. He has a absolute beast of the library called StreamEx that does many other useful things that are not supported out of the box.
I did not understand your second example, but I bet it's doable also.
skip to a more complicated operation are in java-9 via dropWhile and takeWhile that take a Predicate as input.
Just notice that when you say that none of the intermediate operations can do that is not accurate - there is sorted and distinct that do exactly that. They can't work otherwise. There's also flatMap that acts like that, but that is treated more like a bug.
One more thing is that intermediate operations for parallel streams have no defined order, so such a stateful intermediate operation would have unknown entries for a parallel stream. On the other hand you always have the option to abuse things like:
List<Something> list = Collections.synchronizedList()
.map(x -> {
list.add(x);
// your mapping
})
I would not do that if I were you and really think if I might need that, but just in case...
Not every terminal operation will “pull all the values through the stream”. The terminal operations iterator() and spliterator() do not immediately fetch all values and allow to do lazy processing, including constructing a new Stream again. For the latter, it’s strongly recommended to use spliterator() as this allows more meta information to be passed to the new stream and also implies less wrapping of objects.
E.g. your second example could be implemented as
public static <T> Stream<T> replaceWhenEmpty(Stream<T> s, Supplier<Stream<T>> fallBack) {
boolean parallel = s.isParallel();
Spliterator<T> sp = s.spliterator();
Stream.Builder<T> firstElement;
if(sp.getExactSizeIfKnown()==0 || !sp.tryAdvance(firstElement=Stream.builder())) {
s.close();
return fallBack.get();
}
return Stream.concat(firstElement.build(), StreamSupport.stream(sp, parallel))
.onClose(s::close);
}
For your general question, I don’t see how a general abstraction of these examples should look like, except like the spliterator() method that already exist. As the documentation puts it
However, if the provided stream operations do not offer the desired functionality, the BaseStream.iterator() and BaseStream.spliterator() operations can be used to perform a controlled traversal.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With