Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I "split" a stream in fs2?

Tags:

scala

fs2

I want to do something like this:

def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) = 
  (stream, stream.map(split)

But this does not work as it "pulls" from the source twice - once each when I drain both stream and stream.map(split). How do I prevent this? Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?

like image 591
pathikrit Avatar asked Dec 09 '19 18:12

pathikrit


1 Answers

Somehow switch to a "push" based model by maintaining my own internal buffer so I don't pull twice?

Yes. E.g., you can use a queue from fs2:

def splitStream[F[_], A](stream: Stream[F, A], split: A => B): F[(Stream[F, A], Stream[F, B])] = 
  for {
    q <- Queue.noneTerminated[F, A]
  } yield (stream.evalTap(a => q.enqueue1(Some(a)).onFinalize(q.enqueue1(None)), q.dequeue.map(split))

Of course, here the problem is that if a caller ignores either stream, the other one will deadlock and never emit anything. This is generally the issue you run into when trying to make a stream into several ones, and have a value guaranteed to appear in each substream irrespective of when it's subscribed to.

The solution I usually go for is to combine larger actions and use operators like broadcast or parJoin:

def splitAndRun[F[_]: Concurrent, A](
  base: Stream[F, A],
  runSeveralThings: List[Stream[F, A] => Stream[F, Unit]]
): F[Unit] =
  base.broadcastTo(run: _*).compile.drain

Here, you know how many consumers you are going to have, so there will not be an ignored stream in the first place.

like image 187
Oleg Pyzhcov Avatar answered Nov 15 '22 21:11

Oleg Pyzhcov