Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to terminate FS2 stream started from SBT Shell?

If I run this program from SBT shell, then cancel it, it will keep printing "hello". I have to exit SBT to make it stop. Why is that?

import cats.effect.{ExitCode, IO, IOApp}
import fs2.Stream
import scala.concurrent.duration._

object FS2 extends IOApp {

  override def run(args: List[String]) = 
      Stream.awakeEvery[IO](5.seconds).map { _ =>
        println("hello")
      }.compile.drain.as(ExitCode.Error)
}
like image 298
Cpt. Senkfuss Avatar asked Nov 05 '25 18:11

Cpt. Senkfuss


1 Answers

As it was already mentioned in the comments, your application runs in another thread and it is never terminating since the stream is infinite, so you will have to manually terminate it when a signal like SIGTERM or SIGINT is received by the application (it's emitted whenever you hit ctrl+c to terminate the app).

You could do something like this:

  1. create an instance of Deferred
  2. Use it to trigger interruptWhen after any of TERM or INT signal is received.

For example:

import sun.misc.Signal

object FS2 extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = for {
    cancel <- Deferred[IO, Either[Throwable, Unit]] //deferred used as flat telling if terminations signal
                                                    //was received
    _ <- (IO.async_[Unit]{ cb =>
      Signal.handle(
        new Signal("INT"), //INT and TERM signals are nearly identical, we have to handle both
        (sig: Signal) => cb(Right(()))
      )
      Signal.handle(
        new Signal("TERM"),
        (sig: Signal) => cb(Right(()))
      )
    } *> cancel.complete(Right(()))).start //after INT or TERM signal is intercepted it will complete
                                           //deferred and terminate fiber
                                           //we have to run method start to run waiting for signal in another fiber
                                           //in other case program will block here
    app <- Stream.awakeEvery[IO](1.seconds).map { _ => //your stream
      println("hello")
    }.interruptWhen(cancel).compile.drain.as(ExitCode.Error)  //interruptWhen ends stream when deferred completes
  } yield app

}

This version of the app will terminate whenever you hit ctrl + c in sbt shell.

like image 164
Krzysztof Atłasik Avatar answered Nov 07 '25 14:11

Krzysztof Atłasik



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!