I'm trying to collapse several streams backed by huge amounts of data into one, then buffer them. I'm able to collapse these streams into one stream of items with no problem. When I attempt to buffer/chunk the streams, though, it attempts to fully buffer the first stream, which instantly fills up my memory.
It took me a while to narrow down the issue to a minimum test case, but there's some code below.
I can refactor things such that I don't run into this issue, but without understanding why exactly this blows up, I feel like using streams is just a ticking time bomb.
I took inspiration from Buffer Operator on Java 8 Streams for the buffering.
import java.util.*;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class BreakStreams
{
//@see https://stackoverflow.com/questions/47842871/buffer-operator-on-java-8-streams
/**
* Batch a stream into chunks
*/
public static <T> Stream<List<T>> buffer(Stream<T> stream, final long count)
{
final Iterator<T> streamIterator = stream.iterator();
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<List<T>>()
{
@Override public boolean hasNext()
{
return streamIterator.hasNext();
}
@Override public List<T> next()
{
List<T> intermediate = new ArrayList<>();
for (long v = 0; v < count && hasNext(); v++)
{
intermediate.add(streamIterator.next());
}
return intermediate;
}
}, 0), false);
}
public static void main(String[] args)
{
//create streams from huge datasets
Stream<Long> streams = Stream.of(LongStream.range(0, Integer.MAX_VALUE).boxed(),
LongStream.range(0, Integer.MAX_VALUE).boxed())
//collapse into one stream
.flatMap(x -> x);
//iterating over the stream one item at a time is OK..
// streams.forEach(x -> {
//buffering the stream is NOT ok, you will go OOM
buffer(streams, 25).forEach(x -> {
try
{
Thread.sleep(2500);
}
catch (InterruptedException ignore)
{
}
System.out.println(x);
});
}
}
This seems to be connected to the older issue “Why filter() after flatMap() is "not completely" lazy in Java streams?”. While that issue has been fixed for the Stream’s builtin operations, it seems to still exist when we try to iterate over a flatmapped stream externally.
We can simplify the code to reproduce the problem to
Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -> x)
.iterator().hasNext();
Note that using Spliterator
is affected as well
Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -> x)
.spliterator()
.tryAdvance((long l) -> System.out.println("first item: "+l));
Both try to buffer elements until ultimately bailing out with an OutOfMemoryError
.
Since spliterator().forEachRemaining(…)
seems not to be affected, you could implement a solution which works for your use case of forEach
, but it would be fragile, as it would still exhibit the problem for short-circuiting stream operations.
public static <T> Stream<List<T>> buffer(Stream<T> stream, final int count) {
boolean parallel = stream.isParallel();
Spliterator<T> source = stream.spliterator();
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<List<T>>(
(source.estimateSize()+count-1)/count, source.characteristics()
&(Spliterator.SIZED|Spliterator.DISTINCT|Spliterator.ORDERED)
| Spliterator.NONNULL) {
List<T> list;
Consumer<T> c = t -> list.add(t);
@Override
public boolean tryAdvance(Consumer<? super List<T>> action) {
if(list == null) list = new ArrayList<>(count);
if(!source.tryAdvance(c)) return false;
do {} while(list.size() < count && source.tryAdvance(c));
action.accept(list);
list = null;
return true;
}
@Override
public void forEachRemaining(Consumer<? super List<T>> action) {
source.forEachRemaining(t -> {
if(list == null) list = new ArrayList<>(count);
list.add(t);
if(list.size() == count) {
action.accept(list);
list = null;
}
});
if(list != null) {
action.accept(list);
list = null;
}
}
}, parallel);
}
But note that Spliterator
based solutions are preferable in general, as they support carrying additional information enabling optimizations and have lower iteration costs in a lot of use cases. So this is the way to go once this issue has been fixed in the JDK code.
As a workaround, you can use Stream.concat(…)
to combine streams, but it has an explicit warning about not to combine too many streams at once in its documentation:
Use caution when constructing streams from repeated concatenation. Accessing an element of a deeply concatenated stream can result in deep call chains, or even
StackOverflowException
[sic].
The throwable’s name has been corrected to StackOverflowError
in Java 9’s documentation
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With