Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stop the fs2-stream after a timeout

Tags:

scala

fs2

I want to use a function similar to take(n: Int) but in a time dimension: consume(period: Duration. So I want a stream to terminate if a timeout occurs. I know that I could compile a stream to something like IO[List[T]] and cancel it, but then I'll lose the result. In reality I want to convert an endless stream into a limited one and preserve the results.

More on the wider scope of the problem. I have an endless stream of events from a messaging broker, but I also have rotating credentials to connect to the broker. So what I want is to consume the stream of events for some time, then stop, acquire new credentials, connect again to the broker creating a new stream and concatenate two streams into one.

like image 779
Mikhail Golubtsov Avatar asked May 16 '19 08:05

Mikhail Golubtsov


2 Answers

There is a method, that does exactly this:

/**
    * Interrupts this stream after the specified duration has passed.
    */
  def interruptAfter[F2[x] >: F[x]: Concurrent: Timer](duration: FiniteDuration): Stream[F2, O]
like image 147
Mikhail Golubtsov Avatar answered Nov 16 '22 10:11

Mikhail Golubtsov


You need something like that

import scala.util.Random
import scala.concurrent.ExecutionContext
import fs2._
import fs2.concurrent.SignallingRef
implicit val ex = ExecutionContext.global
implicit val t: Timer[IO] = IO.timer(ex)
implicit val cs: ContextShift[IO] = IO.contextShift(ex)

val effect: IO[Long] = IO.sleep(1.second).flatMap(_ => IO{
  val next = Random.nextLong()
  println("NEXT: " + next)
  next
})
val signal = SignallingRef[IO, Boolean](false).unsafeRunSync()
val timer = Stream.sleep(10.seconds).flatMap(_ => 
  Stream.eval(signal.set(true)).flatMap(_ => 
    Stream.emit(println("Finish")).covary[IO]))

val stream = timer concurrently 
Stream.repeatEval(effect).interruptWhen(signal)

stream.compile.drain.unsafeRunSync()

Also if you want to save your result of publishing data you need to have some unbounded Queue from fs2 for converting published data to your result via queue.stream

like image 45
Mikhail Nemenko Avatar answered Nov 16 '22 09:11

Mikhail Nemenko