I try to parallelize some work with Java Streams. Let's consider this simple example:
Stream.generate(new Supplier<Integer>() {
@Override
public Integer get() {
return generateNewInteger();
}
})
.parallel()
.forEachOrdered(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.println(integer);
}
});
The problem is that it does not call the accept
method for forEachOrdered
, it only works if I use forEach
. I guess that the problem is that Stream.generate
internally creates InfiniteSupplyingSpliterator
that does not have ORDERED
characteristic.
The question is why? It seems like we know in which order the data is generated. The second question is how to do forEachOrdered
on parallelized stream with generation of the stream elements?
If our Stream is ordered, it doesn't matter whether our data is being processed sequentially or in parallel; the implementation will maintain the encounter order of the Stream.
Syntax : static <T> Stream<T> generate(Supplier<T> s) Where, Stream is an interface and T is the type of stream elements. s is the Supplier of generated elements and the return value is a new infinite sequential unordered Stream. Example 1 : To generate stream of random integer.
Using the Stream API and the map method, we can transform elements in a stream to another object. Instead of using the map method, we can also write a custom Collector and transform the elements when we use the collect method as a terminal operation of the stream.
The simplest answer is, Stream.generate
is unordered, because it’s specification says so.
It’s not like if the implementation was trying to process items in order when possible, it’s actually the opposite. Once an operation has been defined to be unordered, the implementation will try to draw a benefit from the unordered nature, whenever possible. If you experience something that looks like a source order in an unordered operation, there might not have been a way to get a benefit from the unordered processing or the implementation does not use all opportunities yet. Since this might change in a future version or alternative implementation, you must not rely on the order, if the operation has been specified to be unordered.
The intention behind defining Stream.generate
as unordered might become clearer when comparing with Stream.iterate
, which is ordered. The function passed to iterate
will receive its previous element, so there is a previous-subsequent relationship between the elements, hence, an ordering. The supplier passed Stream.generate
doesn’t receive a previous element, in other words, has no relationship to a previous element when considering the functional signature only. That works for Stream.generate(() -> constant)
or Stream.generate(Type::new)
like use cases, but less for Stream.generate(instance::statefulOp)
, which seems not to be the intended primary use case. It still works, if the operation is thread safe and you can live with the unordered nature of the stream.
The reason why your example does never make progress is that the implementation of forEachOrdered
actually does not consider the unordered nature, but tries to process the chunks after splitting in the encounter order, i.e. all sub-tasks try buffering their elements, so that they can pass them to the action, once the sub-tasks to their left have been completed. Of course, buffering and infinite sources don’t play well together, especially, since the underlying InfiniteSupplyingSpliterator
will split into sub-tasks that are infinite on their own. In principle, there is a leftmost task that could feed its elements directly to the action, but that tasks seems to be somewhere in the queue, waiting to become activated, which will never happen as all worker threads are already busy processing the other infinite sub-tasks. Eventually, the entire operation will break with an OutOfMemoryError
, if you let it run long enough…
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With