I think your assumption about efficiency is kind of backwards. You get this huge efficiency payback if you're only going to use the data once, because you don't have to store it, and streams give you powerful "loop fusion" optimizations that let you flow the whole data efficiently through the pipeline.
If you want to re-use the same data, then by definition you either have to generate it twice (deterministically) or store it. If it already happens to be in a collection, great; then iterating it twice is cheap.
We did experiment in the design with "forked streams". What we found was that supporting this had real costs; it burdened the common case (use once) at the expense of the uncommon case. The big problem was dealing with "what happens when the two pipelines don't consume data at the same rate." Now you're back to buffering anyway. This was a feature that clearly didn't carry its weight.
If you want to operate on the same data repeatedly, either store it, or structure your operations as Consumers and do the following:
stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });
You might also look into the RxJava library, as its processing model lends itself better to this kind of "stream forking".
You can use a local variable with a Supplier
to set up common parts of the stream pipeline.
From http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/:
Reusing Streams
Java 8 streams cannot be reused. As soon as you call any terminal operation the stream is closed:
Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception Calling `noneMatch` after `anyMatch` on the same stream results in the following exception: java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28)
To overcome this limitation we have to to create a new stream chain for every terminal operation we want to execute, e.g. we could create a stream supplier to construct a new stream with all intermediate operations already set up:
Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok
Each call to
get()
constructs a new stream on which we are save to call the desired terminal operation.
Use a Supplier
to produce the stream for each termination operation.
Supplier<Stream<Integer>> streamSupplier = () -> list.stream();
Whenever you need a stream of that collection,
use streamSupplier.get()
to get a new stream.
Examples:
streamSupplier.get().anyMatch(predicate);
streamSupplier.get().allMatch(predicate2);
We've implemented a duplicate()
method for streams in jOOλ, an Open Source library that we created to improve integration testing for jOOQ. Essentially, you can just write:
Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();
Internally, there is a buffer storing all values that have been consumed from one stream but not from the other. That's probably as efficient as it gets if your two streams are consumed about at the same rate, and if you can live with the lack of thread-safety.
Here's how the algorithm works:
static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
final List<T> gap = new LinkedList<>();
final Iterator<T> it = stream.iterator();
@SuppressWarnings("unchecked")
final Iterator<T>[] ahead = new Iterator[] { null };
class Duplicate implements Iterator<T> {
@Override
public boolean hasNext() {
if (ahead[0] == null || ahead[0] == this)
return it.hasNext();
return !gap.isEmpty();
}
@Override
public T next() {
if (ahead[0] == null)
ahead[0] = this;
if (ahead[0] == this) {
T value = it.next();
gap.offer(value);
return value;
}
return gap.poll();
}
}
return tuple(seq(new Duplicate()), seq(new Duplicate()));
}
More source code here
Tuple2
is probably like your Pair
type, whereas Seq
is Stream
with some enhancements.
You could create a stream of runnables (for example):
results.stream()
.flatMap(either -> Stream.<Runnable> of(
() -> failure(either.left()),
() -> success(either.right())))
.forEach(Runnable::run);
Where failure
and success
are the operations to apply. This will however create quite a few temporary objects and may not be more efficient than starting from a collection and streaming/iterating it twice.
Another way to handle the elements multiple times is to use Stream.peek(Consumer):
doSomething().stream()
.peek(either -> handleFailure(either.left()))
.foreach(either -> handleSuccess(either.right()));
peek(Consumer)
can be chained as many times as needed.
doSomething().stream()
.peek(element -> handleFoo(element.foo()))
.peek(element -> handleBar(element.bar()))
.peek(element -> handleBaz(element.baz()))
.foreach(element-> handleQux(element.qux()));
cyclops-react, a library I contribute to, has a static method that will allow you duplicate a Stream (and returns a jOOλ Tuple of Streams).
Stream<Integer> stream = Stream.of(1,2,3);
Tuple2<Stream<Integer>,Stream<Integer>> streams = StreamUtils.duplicate(stream);
See comments, there is performance penalty that will be incurred when using duplicate on an existing Stream. A more performant alternative would be to use Streamable :-
There is also a (lazy) Streamable class that can be constructed from a Stream, Iterable or Array and replayed multiple times.
Streamable<Integer> streamable = Streamable.of(1,2,3);
streamable.stream().forEach(System.out::println);
streamable.stream().forEach(System.out::println);
AsStreamable.synchronizedFromStream(stream) - can be used to create a Streamable that will lazily populate it's backing collection, in a way such that can be shared across threads. Streamable.fromStream(stream) will not incur any synchronization overhead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With