I want to do something like this:
def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) =
(stream, stream.map(split)
But this does not work as it "pulls" from the source twice - once each when I drain both stream
and stream.map(split)
. How do I prevent this? Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?
Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?
Yes. E.g., you can use a queue from fs2:
def splitStream[F[_], A](stream: Stream[F, A], split: A => B): F[(Stream[F, A], Stream[F, B])] =
for {
q <- Queue.noneTerminated[F, A]
} yield (stream.evalTap(a => q.enqueue1(Some(a)).onFinalize(q.enqueue1(None)), q.dequeue.map(split))
Of course, here the problem is that if a caller ignores either stream, the other one will deadlock and never emit anything. This is generally the issue you run into when trying to make a stream into several ones, and have a value guaranteed to appear in each substream irrespective of when it's subscribed to.
The solution I usually go for is to combine larger actions and use operators like broadcast
or parJoin
:
def splitAndRun[F[_]: Concurrent, A](
base: Stream[F, A],
runSeveralThings: List[Stream[F, A] => Stream[F, Unit]]
): F[Unit] =
base.broadcastTo(run: _*).compile.drain
Here, you know how many consumers you are going to have, so there will not be an ignored stream in the first place.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With