Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Splitting a scalaz-stream process into two child streams

Using scalaz-stream is it possible to split/fork and then rejoin a stream?

As an example, let's say I have the following function

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)

val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers  = streamOfNumbers.filter(isOdd).fold(0)(add)

zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )

With scalaz-stream, in this example the results would be as you expect - a tuple of numbers from 1 to 10 passed to a sink.

However if we replace streamOfNumbers with something that requires IO, it will actually execute the IO action twice.

Using a Topic I'm able create a pub/sub process that duplicates elements in the stream correctly, however it does not buffer - it simply consumers the entire source as fast as possible regardless of the pace sinks consume it.

I can wrap this in a bounded Queue, however the end result feels a lot more complex than it needs to be.

Is there a simpler way of splitting a stream in scalaz-stream without duplicate IO actions from the source?

like image 779
James Davies Avatar asked Dec 17 '14 09:12

James Davies


1 Answers

Also to clarify the previous answer delas with the "splitting" requirement. The solution to your specific issue may be without the need of splitting streams:

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val oddOrEven: Process[Task,Int\/Int] = streamOfNumbers.map {
   case even if even % 2 == 0 => right(even)
   case odd => left(odd)
} 
val summed = oddOrEven.pipeW(sump1).pipeO(sump1)

val evenSink: Sink[Task,Int] = ???
val oddSink: Sink[Task,Int] = ???

summed
.drainW(evenSink)
.to(oddSink)
like image 161
Pavel Chlupacek Avatar answered Sep 28 '22 16:09

Pavel Chlupacek