Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can Java 8 streams cause a O(1) memory reduction on unbounded data to become O(n) memory because of the underlying ForkJoin implementation

I've written a streams implementation that performs four simple reductions (+ and <) on the lines of a file.

At first I performed four streams, but I decided to write my own accumulator and combiner so that I could perform all four reductions in one stream. On small data sets (10,000,000 lines) this reduces runtime to about 1/4 as expected, and runs in 14 seconds on my hardware.

fileIn = new BufferedReader(new InputStreamReader(
            new URL(args[0].trim()).openStream()));

final Results results = fileIn.lines()
        .parallel()
        .skip(1)
        .map(User::parse)
        .filter(Optional::isPresent)
        .map(Optional::get)
        .collect(Results::new, Results::accumulate, Results::combine);

Results::accumulate and Results::combine correctly combine Users into Results and Results with Results respectively, and this implementation works great for small data sets.

I tried using .reduce() as well, and results are similar, but I tried .collect() to reduce the creation of short-lived objects.

The problem is that when I use real-world sized data with 1 billion lines I am hitting an issue that suggests that Java 8 streams are incapable of the task. The heap memory is observed in JConsole to climb to the allocated 12 GB in a roughly linear fashion and then OOM.

I was under the impression that the collector or reducer would provide performance comparable to an iterative solution, which should be bounded by CPU and IO but not memory, because the reduction step produces a Result that doesn't grow, it's a reduction!

When I take a heap dump and put it into jhat I see that about 7GB is taken up with Strings, and these strings can clearly be seen to be the lines of the input file. I feel they should not be in memory at all, but jhat shows a very large ForkJoin related structure being accumulated in memory:

Static reference from java.util.concurrent.ForkJoinPool.common (from class java.util.concurrent.ForkJoinPool) :

--> java.util.concurrent.ForkJoinPool@0x786d41db0 (76 bytes) (field workQueues:)
--> [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598 (144 bytes) (Element 3 of [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598:)
--> java.util.concurrent.ForkJoinPool$WorkQueue@0x786d41ee8 (96 bytes) (field currentSteal:)
--> java.util.stream.SliceOps$SliceTask@0x7b4ac6cb0 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b379ad18 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b25bdb68 (130 bytes) (field leftChild:)
--> java.util.stream.SliceOps$SliceTask@0x7b379acb8 (130 bytes) (field localResult:)
--> java.util.stream.Nodes$SpinedNodeBuilder@0x7b25fdda0 (53 bytes) (field spine:)
--> [[Ljava.lang.Object;@0x7b25ffe48 (144 bytes) (Element 12 of [[Ljava.lang.Object;@0x7b25ffe48:)
--> [Ljava.lang.Object;@0x7b37c4f20 (262160 bytes) (Element 19598 of [Ljava.lang.Object;@0x7b37c4f20:)
--> 31ea87ba876505645342b31928394b3c,2013-11-24T23:02:17+00:00,898,22200,1314,700 (28 bytes) (field value:)
--> [C@0x7b2ffff88 (170 bytes) // <<<< There are thousands of these

There are other references in ApplicationShutdownHooks, Local references, and System Classes, but this one I show is the crux of the problem, and it causes the memory to grow O(n) when

Does the streams implementation make this O(1) memory problem O(n) memory by holding all the Strings in the ForkJoin classes?? I love streams and I don't want this to be so :(

like image 405
spl Avatar asked Nov 09 '16 11:11

spl


People also ask

Does Java 8 streams have limited storage?

No storage. Streams don't have storage for values; they carry values from a source (which could be a data structure, a generating function, an I/O channel, etc) through a pipeline of computational steps.

What is the use of streams in Java 8?

Introduced in Java 8, the Stream API is used to process collections of objects. A stream is a sequence of objects that supports various methods which can be pipelined to produce the desired result. A stream is not a data structure instead it takes input from the Collections, Arrays or I/O channels.

Does Java stream improve performance?

In Java8 Streams, performance is achieved by parallelism, laziness, and using short-circuit operations, but there is a downside as well, and we need to be very cautious while choosing Streams, as it may degrade the performance of your application.

What is Java stream reduce?

Java Stream reduction A reduction is a terminal operation that aggregates a stream into a type or a primitive. The Java 8 Stream API contains a set of predefined reduction operations, such as average , sum , min , max , and count , which return one value by combining the elements of a stream.


1 Answers

Thanks to Marko Topolnik and Holger for coming to the correct answer. Though neither posted an answer for me to accept, so I'll try to tie this up for posterity :)

The .skip(1) is very expensive on a parallel stream because it requires ordering to skip exactly the first entry, as per the Javadoc for Stream.skip()

Reading the first line of the BufferedReader before calling .lines() on it does successfully skip the first line in my implementation.

Then removing the .skip() solves the memory problem, and it is observed in JConsole to bounce around nicely and return to < 1GB on every garbage collection even if the program processes 1 billion lines. This is desired behaviour and is close enough to O(1) memory for my purposes.

Contrary to a suggestion above, the relative locations of .parallel() and .skip(1) do not matter, you cannot re-order them to make the .skip(1) happen "before" the .parallel(). The builder pattern suggests that ordering is important, and it is for other intermediate operations, but not for this one. I remember this subtlety from my OCP certification materials, but it doesn't appear to be in the Javadoc, hence no reference. I have, however, confirmed this experimentally by making the isolated change and observing the regression in JConsole, and associated OOM.

like image 80
spl Avatar answered Oct 05 '22 08:10

spl