Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Copy a stream to avoid "stream has already been operated upon or closed"

I think your assumption about efficiency is kind of backwards. You get this huge efficiency payback if you're only going to use the data once, because you don't have to store it, and streams give you powerful "loop fusion" optimizations that let you flow the whole data efficiently through the pipeline.

If you want to re-use the same data, then by definition you either have to generate it twice (deterministically) or store it. If it already happens to be in a collection, great; then iterating it twice is cheap.

We did experiment in the design with "forked streams". What we found was that supporting this had real costs; it burdened the common case (use once) at the expense of the uncommon case. The big problem was dealing with "what happens when the two pipelines don't consume data at the same rate." Now you're back to buffering anyway. This was a feature that clearly didn't carry its weight.

If you want to operate on the same data repeatedly, either store it, or structure your operations as Consumers and do the following:

stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });

You might also look into the RxJava library, as its processing model lends itself better to this kind of "stream forking".


You can use a local variable with a Supplier to set up common parts of the stream pipeline.

From http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/:

Reusing Streams

Java 8 streams cannot be reused. As soon as you call any terminal operation the stream is closed:

Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

Calling `noneMatch` after `anyMatch` on the same stream results in the following exception:
java.lang.IllegalStateException: stream has already been operated upon or closed
at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at 
java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)

To overcome this limitation we have to to create a new stream chain for every terminal operation we want to execute, e.g. we could create a stream supplier to construct a new stream with all intermediate operations already set up:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

Each call to get() constructs a new stream on which we are save to call the desired terminal operation.


Use a Supplier to produce the stream for each termination operation.

Supplier<Stream<Integer>> streamSupplier = () -> list.stream();

Whenever you need a stream of that collection, use streamSupplier.get() to get a new stream.

Examples:

  1. streamSupplier.get().anyMatch(predicate);
  2. streamSupplier.get().allMatch(predicate2);

We've implemented a duplicate() method for streams in jOOλ, an Open Source library that we created to improve integration testing for jOOQ. Essentially, you can just write:

Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();

Internally, there is a buffer storing all values that have been consumed from one stream but not from the other. That's probably as efficient as it gets if your two streams are consumed about at the same rate, and if you can live with the lack of thread-safety.

Here's how the algorithm works:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final List<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}

More source code here

Tuple2 is probably like your Pair type, whereas Seq is Stream with some enhancements.


You could create a stream of runnables (for example):

results.stream()
    .flatMap(either -> Stream.<Runnable> of(
            () -> failure(either.left()),
            () -> success(either.right())))
    .forEach(Runnable::run);

Where failure and success are the operations to apply. This will however create quite a few temporary objects and may not be more efficient than starting from a collection and streaming/iterating it twice.


Another way to handle the elements multiple times is to use Stream.peek(Consumer):

doSomething().stream()
.peek(either -> handleFailure(either.left()))
.foreach(either -> handleSuccess(either.right()));

peek(Consumer) can be chained as many times as needed.

doSomething().stream()
.peek(element -> handleFoo(element.foo()))
.peek(element -> handleBar(element.bar()))
.peek(element -> handleBaz(element.baz()))
.foreach(element-> handleQux(element.qux()));

cyclops-react, a library I contribute to, has a static method that will allow you duplicate a Stream (and returns a jOOλ Tuple of Streams).

    Stream<Integer> stream = Stream.of(1,2,3);
    Tuple2<Stream<Integer>,Stream<Integer>> streams =  StreamUtils.duplicate(stream);

See comments, there is performance penalty that will be incurred when using duplicate on an existing Stream. A more performant alternative would be to use Streamable :-

There is also a (lazy) Streamable class that can be constructed from a Stream, Iterable or Array and replayed multiple times.

    Streamable<Integer> streamable = Streamable.of(1,2,3);
    streamable.stream().forEach(System.out::println);
    streamable.stream().forEach(System.out::println);

AsStreamable.synchronizedFromStream(stream) - can be used to create a Streamable that will lazily populate it's backing collection, in a way such that can be shared across threads. Streamable.fromStream(stream) will not incur any synchronization overhead.