Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel Infinite Java Streams run out of Memory

I'm trying to understand why the following Java program gives an OutOfMemoryError, while the corresponding program without .parallel() doesn't.

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

I have two questions:

  1. What is the intended output of this program?

    Without .parallel() it seems that this simply outputs sum(1+2+3+...) which means that it simply "gets stuck" at the first stream in the flatMap, which makes sense.

    With parallel I don't know if there is an expected behaviour, but my guess would be that it somehow interleaved the first n or so streams, where n is the number of parallel workers. It could also be slightly different based on the chunking/buffering behaviour.

  2. What causes it to run out of memory? I'm specifically trying to understand how these streams are implemented under the hood.

    I'm guessing something blocks the stream, so it never finishes and is able to get rid of the generated values, but I don't quite know in which order things are evaluated and where buffering occurs.

Edit: In case it is relevant, I'm using Java 11.

Editt 2: Apparently the same thing happens even for the simple program IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum(), so it might have to do with the lazyness of limit rather than flatMap.

like image 452
Thomas Ahle Avatar asked Jan 31 '20 10:01

Thomas Ahle


People also ask

What is the disadvantage of parallel stream in Java 8?

1. Parallel Streams can actually slow you down. Java 8 brings the promise of parallelism as one of the most anticipated new features.

How many threads can be executed at a time in parallel stream?

In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.

Can Java streams be of infinite size?

We can create an infinite stream of any custom type elements by passing a function of a Supplier interface to a generate() method on a Stream.

When we should not use parallel stream?

Similarly, don't use parallel if the stream is ordered and has much more elements than you want to process, e.g. This may run much longer because the parallel threads may work on plenty of number ranges instead of the crucial one 0-100, causing this to take very long time.


2 Answers

You say “but I don't quite know in which order things are evaluated and where buffering occurs”, which is precisely what parallel streams are about. The order of evaluation is unspecified.

A critical aspect of your example is the .limit(100_000_000). This implies that the implementation can’t just sum up arbitrary values, but must sum up the first 100,000,000 numbers. Note that in the reference implementation, .unordered().limit(100_000_000) doesn’t change the outcome, which indicates that there’s no special implementation for the unordered case, but that’s an implementation detail.

Now, when worker threads process the elements, they can’t just sum them up, as they have to know which elements they are allowed to consume, which depends on how many elements are preceding their specific workload. Since this stream doesn’t know the sizes, this can only be known when the prefix elements have been processed, which never happens for infinite streams. So the worker threads keep buffering for the moment, this information becomes available.

In principle, when a worker thread knows that it processes the leftmost¹ work-chunk, it could sum up the elements immediately, count them, and signal the end when reaching the limit. So the Stream could terminate, but this depends on a lot of factors.

In your case, a plausible scenario is that the other worker threads are faster in allocating buffers than the leftmost job is counting. In this scenario, subtle changes to the timing could make the stream occasionally return with a value.

When we slow down all worker threads except the one processing the leftmost chunk, we can make the stream terminate (at least in most runs):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹ I’m following a suggestion by Stuart Marks to use left-to-right order when talking about the encounter order rather than the processing order.

like image 68
Holger Avatar answered Oct 09 '22 15:10

Holger


My best guess is that adding parallel() changes the internal behavior of flatMap() which already had problems being evaluated lazily before.

The OutOfMemoryError error that you are getting was reported in [JDK-8202307] Getting a java.lang.OutOfMemoryError: Java heap space when calling Stream.iterator().next() on a stream which uses an infinite/very big Stream in flatMap. If you look at the ticket it's more or less the same stack trace that you are getting. The ticket was closed as Won't Fix with following reason:

The iterator() and spliterator() methods are "escape hatches" to be used when it's not possible to use other operations. They have some limitations because they turn what is a push model of the stream implementation into a pull model. Such a transition requires buffering in certain cases, such as when an element is (flat) mapped to two or more elements. It would significantly complicate the stream implementation, likely at the expense of common cases, to support a notion of back-pressure to communicate how many elements to pull through nested layers of element production.

like image 5
Karol Dowbecki Avatar answered Oct 09 '22 14:10

Karol Dowbecki