Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala Cats Effects - IO Async Shift - How Does it Work?

Here is some Scala cats code using the IO Monad:

import java.util.concurrent.{ExecutorService, Executors}

import cats.effect.IO

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.control.NonFatal

object Program extends App {

  type CallbackType = (Either[Throwable, Unit]) => Unit

  // IO.async[Unit] is like a Future that returns Unit on completion.
  // Unlike a regular Future, it doesn't start to run until unsafeRunSync is called.
  def forkAsync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.async[Unit] { callback: CallbackType =>
    // "callback" is a function that either takes a throwable (Left) or whatever toRun returns (Right).
    println("LalalaAsync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        val nothing: Unit = toRun() // Note: This line executes the body and returns nothing, which is of type Unit.
        try {
          callback(Right(nothing)) // On success, the callback returns nothing
        } catch {
          case NonFatal(t) => callback(Left(t)) // On failure, it returns an exception
        }
      }
    })
  }

  def forkSync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.apply {
    println("LalalaSync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        toRun()
      }
    })
  }

  val treadPool: ExecutorService = Executors.newSingleThreadExecutor()
  val mainThread: Thread = Thread.currentThread()

  val Global: ExecutionContextExecutor = ExecutionContext.global

  /*
  Output:
    1 Hello World printed synchronously from Main.main
    LalalaSync: scala-execution-context-global-12
    Hello World printed synchronously from thread pool.pool-1-thread-1
    LalalaAsync: scala-execution-context-global-12
    Hello World printed asynchronously from thread pool.pool-1-thread-1
    2 Hello World printed synchronously from Global .scala-execution-context-global-12
   */
  val program = for {
    _ <- IO {
      println("1 Hello World printed synchronously from Main." + Thread.currentThread().getName) // "main" thread
    }
    _ <- IO.shift(Global) // Shift to Global Execution Context
    _ <- forkSync { () =>
      println("Hello World printed synchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
    }(treadPool)
    _ <- forkAsync { () =>
      println("Hello World printed asynchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
    }(treadPool)
    _ <- IO.shift(Global) // Shift to Global Execution Context
    _ <- IO {
      println("2 Hello World printed synchronously from Global ." + Thread.currentThread().getName) // "scala-execution-context-global-13" thread
    }
  } yield ()

  program.unsafeRunSync()
}

To run it you would need to add:

libraryDependencies ++= Seq(
  "org.typelevel" %% "cats" % "0.9.0",
  "org.typelevel" %% "cats-effect" % "0.3"
),

To your build.sbt file.

Note the output:

  /*
  Output:
    1 Hello World printed synchronously from Main.main
    LalalaSync: scala-execution-context-global-12
    Hello World printed synchronously from thread pool.pool-1-thread-1
    LalalaAsync: scala-execution-context-global-12
    Hello World printed asynchronously from thread pool.pool-1-thread-1
    2 Hello World printed synchronously from Global .scala-execution-context-global-12
 */

Basically, I don't understand how IO.shift(Global) or how IO.async works.

For example, why is it that after I call "forkAsync", if I don't call "IO.shift(Global)", the subsequent synchronous IO objects are run in "pool-1-thread-1". Also, what is the difference between forkAsync and forkSync in this example? Both of them start in the ExecutionContext.global and then execute a Runnable in "pool.pool-1-thread-1".

Like are forkAsync and forkSync doing the exact same thing or is forkAsync doing something different? If they are doing the same thing, what is the point of wrapping code in IO.async? If they are not doing the same thing, how are they different?

like image 264
Michael Lafayette Avatar asked Sep 21 '18 06:09

Michael Lafayette


People also ask

What is cats effect Scala?

Cats Effect is a Scala library that makes it easy to write code that effectively uses multiple cores and doesn't leak resources. This makes building complex applications, such as highly concurrent services, much more productive.

What is the use of cats Scala?

Cats is a library which provides abstractions for functional programming in the Scala programming language. Scala supports both object-oriented and functional programming, and this is reflected in the hybrid approach of the standard library.

What is IO in Scala?

scala.io - Input and output operations.


1 Answers

For example, why is it that after I call "forkAsync", if I don't call "IO.shift(Global)", the subsequent synchronous IO objects are run in "pool-1-thread-1".

The more important question is why would you expect it to evaluate the "subsequent synchronous IO objects" on global?

IO doesn't have internally the notion of thread-pools, it doesn't know about global, so it cannot shift back to your default thread-pool, therefore you need to trigger a manual shift indeed.

Upgrade to the latest version 1.0.0 and you also have evalOn in ContextShift which will execute an IO action on a specified thread-pool and then shift back to your "global", which I suppose is what you want.

Also, what is the difference between forkAsync and forkSync in this example?

Your forkSync triggers the execution of the Runnable, but does not wait for its completion. It's a fire and forget. Which means that subsequent chained actions will not do back-pressuring.

Some advice:

  1. upgrade to the latest version (1.0.0)
  2. read the docs at: https://typelevel.org/cats-effect/datatypes/io.html
like image 54
Alexandru Nedelcu Avatar answered Sep 29 '22 05:09

Alexandru Nedelcu