Using scalaz-stream is it possible to split/fork and then rejoin a stream?
As an example, let's say I have the following function
val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers = streamOfNumbers.filter(isOdd).fold(0)(add)
zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )
With scalaz-stream, in this example the results would be as you expect - a tuple of numbers from 1 to 10 passed to a sink.
However if we replace streamOfNumbers
with something that requires IO, it will actually execute the IO action twice.
Using a Topic
I'm able create a pub/sub process that duplicates elements in the stream correctly, however it does not buffer - it simply consumers the entire source as fast as possible regardless of the pace sinks consume it.
I can wrap this in a bounded Queue, however the end result feels a lot more complex than it needs to be.
Is there a simpler way of splitting a stream in scalaz-stream without duplicate IO actions from the source?
Also to clarify the previous answer delas with the "splitting" requirement. The solution to your specific issue may be without the need of splitting streams:
val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val oddOrEven: Process[Task,Int\/Int] = streamOfNumbers.map {
case even if even % 2 == 0 => right(even)
case odd => left(odd)
}
val summed = oddOrEven.pipeW(sump1).pipeO(sump1)
val evenSink: Sink[Task,Int] = ???
val oddSink: Sink[Task,Int] = ???
summed
.drainW(evenSink)
.to(oddSink)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With