Here is some Scala cats code using the IO Monad:
import java.util.concurrent.{ExecutorService, Executors}
import cats.effect.IO
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.control.NonFatal
object Program extends App {
type CallbackType = (Either[Throwable, Unit]) => Unit
// IO.async[Unit] is like a Future that returns Unit on completion.
// Unlike a regular Future, it doesn't start to run until unsafeRunSync is called.
def forkAsync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.async[Unit] { callback: CallbackType =>
// "callback" is a function that either takes a throwable (Left) or whatever toRun returns (Right).
println("LalalaAsync: " + Thread.currentThread().getName)
executor.execute(new Runnable {
def run(): Unit = {
val nothing: Unit = toRun() // Note: This line executes the body and returns nothing, which is of type Unit.
try {
callback(Right(nothing)) // On success, the callback returns nothing
} catch {
case NonFatal(t) => callback(Left(t)) // On failure, it returns an exception
}
}
})
}
def forkSync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.apply {
println("LalalaSync: " + Thread.currentThread().getName)
executor.execute(new Runnable {
def run(): Unit = {
toRun()
}
})
}
val treadPool: ExecutorService = Executors.newSingleThreadExecutor()
val mainThread: Thread = Thread.currentThread()
val Global: ExecutionContextExecutor = ExecutionContext.global
/*
Output:
1 Hello World printed synchronously from Main.main
LalalaSync: scala-execution-context-global-12
Hello World printed synchronously from thread pool.pool-1-thread-1
LalalaAsync: scala-execution-context-global-12
Hello World printed asynchronously from thread pool.pool-1-thread-1
2 Hello World printed synchronously from Global .scala-execution-context-global-12
*/
val program = for {
_ <- IO {
println("1 Hello World printed synchronously from Main." + Thread.currentThread().getName) // "main" thread
}
_ <- IO.shift(Global) // Shift to Global Execution Context
_ <- forkSync { () =>
println("Hello World printed synchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
}(treadPool)
_ <- forkAsync { () =>
println("Hello World printed asynchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
}(treadPool)
_ <- IO.shift(Global) // Shift to Global Execution Context
_ <- IO {
println("2 Hello World printed synchronously from Global ." + Thread.currentThread().getName) // "scala-execution-context-global-13" thread
}
} yield ()
program.unsafeRunSync()
}
To run it you would need to add:
libraryDependencies ++= Seq(
"org.typelevel" %% "cats" % "0.9.0",
"org.typelevel" %% "cats-effect" % "0.3"
),
To your build.sbt file.
Note the output:
/*
Output:
1 Hello World printed synchronously from Main.main
LalalaSync: scala-execution-context-global-12
Hello World printed synchronously from thread pool.pool-1-thread-1
LalalaAsync: scala-execution-context-global-12
Hello World printed asynchronously from thread pool.pool-1-thread-1
2 Hello World printed synchronously from Global .scala-execution-context-global-12
*/
Basically, I don't understand how IO.shift(Global) or how IO.async works.
For example, why is it that after I call "forkAsync", if I don't call "IO.shift(Global)", the subsequent synchronous IO objects are run in "pool-1-thread-1". Also, what is the difference between forkAsync and forkSync in this example? Both of them start in the ExecutionContext.global and then execute a Runnable in "pool.pool-1-thread-1".
Like are forkAsync and forkSync doing the exact same thing or is forkAsync doing something different? If they are doing the same thing, what is the point of wrapping code in IO.async? If they are not doing the same thing, how are they different?
Cats Effect is a Scala library that makes it easy to write code that effectively uses multiple cores and doesn't leak resources. This makes building complex applications, such as highly concurrent services, much more productive.
Cats is a library which provides abstractions for functional programming in the Scala programming language. Scala supports both object-oriented and functional programming, and this is reflected in the hybrid approach of the standard library.
scala.io - Input and output operations.
For example, why is it that after I call "forkAsync", if I don't call "IO.shift(Global)", the subsequent synchronous IO objects are run in "pool-1-thread-1".
The more important question is why would you expect it to evaluate the "subsequent synchronous IO objects" on global?
IO
doesn't have internally the notion of thread-pools, it doesn't know about global
, so it cannot shift back to your default thread-pool, therefore you need to trigger a manual shift indeed.
Upgrade to the latest version 1.0.0
and you also have evalOn
in ContextShift
which will execute an IO
action on a specified thread-pool and then shift back to your "global", which I suppose is what you want.
Also, what is the difference between forkAsync and forkSync in this example?
Your forkSync
triggers the execution of the Runnable
, but does not wait for its completion. It's a fire and forget. Which means that subsequent chained actions will not do back-pressuring.
Some advice:
1.0.0
)If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With