I understand that in general Java streams do not split. However, we have an involved and lengthy pipeline, at the end of which we have two different types of processing that share the first part of the pipeline.
Due to the size of the data, storing the intermediate stream product is not a viable solution. Neither is running the pipeline twice.
Basically, what we are looking for is a solution that is an operation on a stream that yields two (or more) streams that are lazily filled and able to be consumed in parallel. By that, I mean that if stream A is split into streams B and C, when streams B and C consume 10 elements, stream A consumes and provides those 10 elements, but if stream B then tries to consume more elements, it blocks until stream C also consumes them.
Is there any pre-made solution for this problem or any library we can look at? If not, where would we start to look if we want to implement this ourselves? Or is there a compelling reason not to implemented at all?
I don't know about functionality that would fulfill your blocking requirement, but you might be interested in jOOλ's Seq.duplicate() method:
Stream<T> streamA = Stream.of(/* your data here */);
Tuple2<Seq<T>, Seq<T>> streamTuple = Seq.seq(streamA).duplicate();
Stream<T> streamB = streamTuple.v1();
Stream<T> streamC = streamTuple.v2();
The Stream
s can be consumed absolutely independently (including consumption in parallel) thanks to the SeqBuffer
class that's used internally by this method.
Note that:
SeqBuffer
will cache even the elements that are no longer needed because they have already been consumed by both streamB
and streamC
(so if you cannot afford to keep them in memory, it's not a solution for you);streamB
and streamC
will not block one another.Disclaimer: I am the author of the SeqBuffer
class.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With