Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallelism with Streams created from Iterators

Experimenting with streams I ran into the following behavior which I don't quite understand. I created a parallel stream from an iterator and I noticed that it did not seem to be exhibiting parallelism. In the below example I've printed a counter to the console for two parallel streams, one created from an iterator and the other from a list. The stream created from the list exhibited the behavior I expected which was to print the counter in non-sequential order but the stream created from the iterator printed the counter in sequential order. Am I creating the parallel stream from an iterator incorrectly?

    private static int counter = 0;

public static void main(String[] args) {
    List<Integer> lstr = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
    Iterator<Integer> iter = lstr.iterator();

    System.out.println("Iterator Stream: ");
    StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, Spliterator.IMMUTABLE | Spliterator.CONCURRENT), true).forEach(i -> {
        System.out.print(counter + " ");
        counter++;
    });

    counter = 0;
    System.out.println("\nList Stream: ");
    lstr.parallelStream().forEach(i -> {
        System.out.print(counter + " ");
        counter++;
    });

}
like image 315
Johnathan Avatar asked Jan 17 '18 17:01

Johnathan


People also ask

What are parallel streams?

Normally any java code has one stream of processing, where it is executed sequentially. Whereas by using parallel streams, we can divide the code into multiple streams that are executed in parallel on separate cores and the final result is the combination of the individual outcomes.

How many threads created when use parallel streams?

In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads. Parallel streams create ForkJoinPool instance via static ForkJoinPool.

Are parallel streams thread safe?

Parallel streams provide the capability of parallel processing over collections that are not thread-safe. It is although required that one does not modify the collection during the parallel processing.

What is the difference between stream and parallel stream?

A sequential stream is executed in a single thread running on one CPU core. The elements in the stream are processed sequentially in a single pass by the stream operations that are executed in the same thread. A parallel stream is executed by different threads, running on multiple CPU cores in a computer.


2 Answers

There is no guaranty that parallel processing prints the counter in non-sequential order. Further, since you’re updating a variable without synchronization, it’s possible to miss updates made by other threads, so the results may be entirely inconsistent.

Besides that, an Iterator has to be polled sequentially, so to get at least some gain from parallel processing, elements have to be buffered, but without a known size, there is no good estimate on how many elements to buffer. The default strategy uses more than thousand elements and does not split the work well.

So if you use more than thousand elements you might notice more parallel activity. Alternatively, you may specify a size using StreamSupport.stream(Spliterators.spliterator(iter, lstr.size(), 0), true) to construct the stream. Then, the internally used buffering will be adapted.

Still, the List’s stream will have a more efficient parallel processing, as it not only knows its size but supports splitting the workload utilizing the random access nature of the underlying data structure.

like image 138
Holger Avatar answered Oct 14 '22 15:10

Holger


The current implementation will try to parallelize a stream produced form an iterator by buffering up the values and dispatching them to several threads but it only kicks in if the stream is long enough. Increase your list to 10000 elements and you should see parallelism.

With a large list, it might be easier to see how elements distributed by thread if you collect into a map grouped by thread. Replace your .forEach with .collect(Collectors.groupingBy(x -> Thread.currentThread().getName(), Collectors.counting()))

like image 31
Misha Avatar answered Oct 14 '22 16:10

Misha