Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java Streams - Buffering huge streams

I'm trying to collapse several streams backed by huge amounts of data into one, then buffer them. I'm able to collapse these streams into one stream of items with no problem. When I attempt to buffer/chunk the streams, though, it attempts to fully buffer the first stream, which instantly fills up my memory.

It took me a while to narrow down the issue to a minimum test case, but there's some code below.

I can refactor things such that I don't run into this issue, but without understanding why exactly this blows up, I feel like using streams is just a ticking time bomb.

I took inspiration from Buffer Operator on Java 8 Streams for the buffering.

import java.util.*;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class BreakStreams
{

   //@see https://stackoverflow.com/questions/47842871/buffer-operator-on-java-8-streams
   /**
    * Batch a stream into chunks
    */
   public static <T> Stream<List<T>> buffer(Stream<T> stream, final long count)
   {
      final Iterator<T> streamIterator = stream.iterator();

      return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<List<T>>()
      {
         @Override public boolean hasNext()
         {
            return streamIterator.hasNext();
         }

         @Override public List<T> next()
         {
            List<T> intermediate = new ArrayList<>();
            for (long v = 0; v < count && hasNext(); v++)
            {
               intermediate.add(streamIterator.next());
            }
            return intermediate;
         }
      }, 0), false);
   }

   public static void main(String[] args)
   {

      //create streams from huge datasets
      Stream<Long> streams = Stream.of(LongStream.range(0, Integer.MAX_VALUE).boxed(),
                                       LongStream.range(0, Integer.MAX_VALUE).boxed())
                                   //collapse into one stream
                                   .flatMap(x -> x);
      //iterating over the stream one item at a time is OK..
//      streams.forEach(x -> {

      //buffering the stream is NOT ok, you will go OOM
      buffer(streams, 25).forEach(x -> {
         try
         {
            Thread.sleep(2500);
         }
         catch (InterruptedException ignore)
         {
         }
         System.out.println(x);
      });
   }
}

like image 415
Michael Martin Avatar asked Apr 09 '20 05:04

Michael Martin


1 Answers

This seems to be connected to the older issue “Why filter() after flatMap() is "not completely" lazy in Java streams?”. While that issue has been fixed for the Stream’s builtin operations, it seems to still exist when we try to iterate over a flatmapped stream externally.

We can simplify the code to reproduce the problem to

Stream.of(LongStream.range(0, Integer.MAX_VALUE))
    .flatMapToLong(x -> x)
    .iterator().hasNext();

Note that using Spliterator is affected as well

Stream.of(LongStream.range(0, Integer.MAX_VALUE))
    .flatMapToLong(x -> x)
    .spliterator()
    .tryAdvance((long l) -> System.out.println("first item: "+l));

Both try to buffer elements until ultimately bailing out with an OutOfMemoryError.

Since spliterator().forEachRemaining(…) seems not to be affected, you could implement a solution which works for your use case of forEach, but it would be fragile, as it would still exhibit the problem for short-circuiting stream operations.

public static <T> Stream<List<T>> buffer(Stream<T> stream, final int count) {
    boolean parallel = stream.isParallel();
    Spliterator<T> source = stream.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<List<T>>(
            (source.estimateSize()+count-1)/count, source.characteristics()
                &(Spliterator.SIZED|Spliterator.DISTINCT|Spliterator.ORDERED)
                    | Spliterator.NONNULL) {
            List<T> list;
            Consumer<T> c = t -> list.add(t);
            @Override
            public boolean tryAdvance(Consumer<? super List<T>> action) {
                if(list == null) list = new ArrayList<>(count);
                if(!source.tryAdvance(c)) return false;
                do {} while(list.size() < count && source.tryAdvance(c));
                action.accept(list);
                list = null;
                return true;
            }
            @Override
            public void forEachRemaining(Consumer<? super List<T>> action) {
                source.forEachRemaining(t -> {
                    if(list == null) list = new ArrayList<>(count);
                    list.add(t);
                    if(list.size() == count) {
                        action.accept(list);
                        list = null;
                    }
                });
                if(list != null) {
                    action.accept(list);
                    list = null;
                }
            }
        }, parallel);
}

But note that Spliterator based solutions are preferable in general, as they support carrying additional information enabling optimizations and have lower iteration costs in a lot of use cases. So this is the way to go once this issue has been fixed in the JDK code.

As a workaround, you can use Stream.concat(…) to combine streams, but it has an explicit warning about not to combine too many streams at once in its documentation:

Use caution when constructing streams from repeated concatenation. Accessing an element of a deeply concatenated stream can result in deep call chains, or even StackOverflowException [sic].

The throwable’s name has been corrected to StackOverflowError in Java 9’s documentation

like image 114
Holger Avatar answered Nov 19 '22 05:11

Holger