Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is Java Stream generator unordered?

I try to parallelize some work with Java Streams. Let's consider this simple example:

Stream.generate(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return generateNewInteger();
        }
    })
    .parallel()
    .forEachOrdered(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) {
            System.out.println(integer);
        }
    });

The problem is that it does not call the accept method for forEachOrdered, it only works if I use forEach. I guess that the problem is that Stream.generate internally creates InfiniteSupplyingSpliterator that does not have ORDERED characteristic.

The question is why? It seems like we know in which order the data is generated. The second question is how to do forEachOrdered on parallelized stream with generation of the stream elements?

like image 476
Alexander Ponomarev Avatar asked Feb 27 '17 16:02

Alexander Ponomarev


People also ask

Does Java Stream Map preserve order?

If our Stream is ordered, it doesn't matter whether our data is being processed sequentially or in parallel; the implementation will maintain the encounter order of the Stream.

How to use Stream generate in Java?

Syntax : static <T> Stream<T> generate(Supplier<T> s) Where, Stream is an interface and T is the type of stream elements. s is the Supplier of generated elements and the return value is a new infinite sequential unordered Stream. Example 1 : To generate stream of random integer.

Which of the following methods can we use to operate on a stream and transform it into another stream?

Using the Stream API and the map method, we can transform elements in a stream to another object. Instead of using the map method, we can also write a custom Collector and transform the elements when we use the collect method as a terminal operation of the stream.


1 Answers

The simplest answer is, Stream.generate is unordered, because it’s specification says so.

It’s not like if the implementation was trying to process items in order when possible, it’s actually the opposite. Once an operation has been defined to be unordered, the implementation will try to draw a benefit from the unordered nature, whenever possible. If you experience something that looks like a source order in an unordered operation, there might not have been a way to get a benefit from the unordered processing or the implementation does not use all opportunities yet. Since this might change in a future version or alternative implementation, you must not rely on the order, if the operation has been specified to be unordered.

The intention behind defining Stream.generate as unordered might become clearer when comparing with Stream.iterate, which is ordered. The function passed to iterate will receive its previous element, so there is a previous-subsequent relationship between the elements, hence, an ordering. The supplier passed Stream.generate doesn’t receive a previous element, in other words, has no relationship to a previous element when considering the functional signature only. That works for Stream.generate(() -> constant) or Stream.generate(Type::new) like use cases, but less for Stream.generate(instance::statefulOp), which seems not to be the intended primary use case. It still works, if the operation is thread safe and you can live with the unordered nature of the stream.

The reason why your example does never make progress is that the implementation of forEachOrdered actually does not consider the unordered nature, but tries to process the chunks after splitting in the encounter order, i.e. all sub-tasks try buffering their elements, so that they can pass them to the action, once the sub-tasks to their left have been completed. Of course, buffering and infinite sources don’t play well together, especially, since the underlying InfiniteSupplyingSpliterator will split into sub-tasks that are infinite on their own. In principle, there is a leftmost task that could feed its elements directly to the action, but that tasks seems to be somewhere in the queue, waiting to become activated, which will never happen as all worker threads are already busy processing the other infinite sub-tasks. Eventually, the entire operation will break with an OutOfMemoryError, if you let it run long enough…

like image 187
Holger Avatar answered Oct 13 '22 05:10

Holger