Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to shutdown a fs2.StreamApp programmatically?

Tags:

scala

fs2

http4s

Extending StreamApp asks you to provide the stream def. It has a requestShutdown parameter.

def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode]

I provide the implementation for this and understand that args is passed in as command line arguments. I'm unsure however, what supplies the requestShutdown parameter and what I can do with it.

Specifically, I'd like to invoke a graceful shutdown on a Stream[IO, ExitCode] which is starting a Http4s server (which blocks forever).

It looks like a Signal is needed and must be set? The underlying stream that I'm trying to 'get at' looks like this:

for {
   scheduler <- Scheduler[IO](corePoolSize = 1)
   exitCode  <- BlazeBuilder[IO]
                    .bindHttp(port, "0.0.0.0")
                    .mountService(services(scheduler), "/")
                    .serve
    } yield exitCode

My stream def is here and StreamAppSpec from the fs2 project has something in the StreamAppSpec but I can't work out how I'd adapt it.

like image 257
Toby Avatar asked Mar 06 '18 21:03

Toby


1 Answers

You can think of the requestShutdown parameter that is supplied to the stream function as meaning an action that, when executed, will request the termination of the program.

Executing it will consequently result in it ending the program.

Here is an example use:

  override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] =
    for {
      scheduler <- Scheduler[IO](corePoolSize = 1)
      exitStream = scheduler.sleep[IO](10 seconds)
       .evalMap(_ => requestShutdown)
       .map(_ => ExitCode.Success)
      serverStream = BlazeBuilder[IO]
        .bindHttp(port, "0.0.0.0")
        .mountService(services(scheduler), "/")
        .serve
      result <- Stream.emits(List(exitStream, serverStream)).joinUnbounded
    } yield result

In this scenario, we create two streams:

  • The first will wait for 10 seconds before triggering the effect of
    terminating the app.

  • The second will run the http4s server.

We then join these two streams so that they run concurrently meaning that the web server will run for 10 seconds before the other stream signals that the program should terminate.

like image 131
TheInnerLight Avatar answered Oct 16 '22 14:10

TheInnerLight