I want to use a function similar to take(n: Int)
but in a time dimension:
consume(period: Duration
. So I want a stream to terminate if a timeout occurs. I know that I could compile a stream to something like IO[List[T]]
and cancel it, but then I'll lose the result. In reality I want to convert an endless stream into a limited one and preserve the results.
More on the wider scope of the problem. I have an endless stream of events from a messaging broker, but I also have rotating credentials to connect to the broker. So what I want is to consume the stream of events for some time, then stop, acquire new credentials, connect again to the broker creating a new stream and concatenate two streams into one.
There is a method, that does exactly this:
/**
* Interrupts this stream after the specified duration has passed.
*/
def interruptAfter[F2[x] >: F[x]: Concurrent: Timer](duration: FiniteDuration): Stream[F2, O]
You need something like that
import scala.util.Random
import scala.concurrent.ExecutionContext
import fs2._
import fs2.concurrent.SignallingRef
implicit val ex = ExecutionContext.global
implicit val t: Timer[IO] = IO.timer(ex)
implicit val cs: ContextShift[IO] = IO.contextShift(ex)
val effect: IO[Long] = IO.sleep(1.second).flatMap(_ => IO{
val next = Random.nextLong()
println("NEXT: " + next)
next
})
val signal = SignallingRef[IO, Boolean](false).unsafeRunSync()
val timer = Stream.sleep(10.seconds).flatMap(_ =>
Stream.eval(signal.set(true)).flatMap(_ =>
Stream.emit(println("Finish")).covary[IO]))
val stream = timer concurrently
Stream.repeatEval(effect).interruptWhen(signal)
stream.compile.drain.unsafeRunSync()
Also if you want to save your result of publishing data you need to have some unbounded Queue from fs2 for converting published data to your result via queue.stream
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With