I'm somewhat new to Scala and ZIO and have run into something of an odd puzzle.
I would like to setup a ZIO Environment containing a ZIO Queue and later
have different ZIO Tasks offer
and take
from this shared Queue.
I tried defining my environment like this
trait MainEnv extends Console with Clock
{
val mainQueue = Queue.unbounded[String]
}
and accessing the queue from separate tasks like this
for {
env <- ZIO.environment[MainEnv]
queue <- env.mainQueue
...
but in my test I observe each of my separate tasks is given a separate Queue instance.
Looking at the signature for unbounded
def unbounded[A]: UIO[Queue[A]]
I observe it doesn't immediately return a Queue but rather returns an effect which returns a queue so while the observed behavior makes sense it isn't at all what I was hoping for and I don't see a clear way to get the behavior I would like.
Would appreciate any suggestions as to how to achieve my goal of setting up different tasks communicating via a shared queue stored in the Environment.
For reference here is my code and output.
bash-3.2$ sbt run
[info] Loading project definition from /private/tmp/env-q/project
[info] Loading settings for project root from build.sbt ...
[info] Set current project to example (in build file:/private/tmp/env-q/)
[info] Compiling 1 Scala source to /private/tmp/env-q/target/scala-2.12/classes ...
[info] Done compiling.
[info] Packaging /private/tmp/env-q/target/scala-2.12/example_2.12-0.0.1-SNAPSHOT.jar ...
[info] Done packaging.
[info] Running example.Main
env example.Main$$anon$1@36fbcafd queue zio.Queue$$anon$1@65b9a444
env example.Main$$anon$1@36fbcafd queue zio.Queue$$anon$1@7c050764
(hangs here - notice env object is the same but queue objects are different so second task is stuck)
Here is my complete test which is based on example from slide 37 of https://www.slideshare.net/jdegoes/zio-queue
package example
import zio.{App, Queue, ZIO}
import zio.blocking.Blocking
import zio.clock.Clock
import zio.console._
trait MainEnv extends Console with Clock // environment with queue
{
val mainQueue = Queue.unbounded[String]
}
object Main extends App // main test
{
val task1 = for { // task to add something to the queue
env <- ZIO.environment[MainEnv]
queue <- env.mainQueue
_ <- putStrLn(s"env $env queue $queue")
_ <- queue.offer("Give me Coffee!")
} yield ()
val task2 = for { // task to remove+print stuff from queue
env <- ZIO.environment[MainEnv]
queue <- env.mainQueue
_ <- putStrLn(s"env $env queue $queue")
_ <- queue.take.flatMap(putStrLn)
} yield ()
val program = ZIO.runtime[MainEnv] // top level to run both tasks
.flatMap {
implicit rts =>
for {
_ <- task1.fork
_ <- task2
} yield ()
}
val runEnv = new MainEnv with Console.Live with Clock.Live
def run(args: List[String]) =
program.provide(runEnv).fold(_ => 1, _ => 0)
}
Here is the build.sbt I used
val ZioVersion = "1.0.0-RC13"
lazy val root = (project in file("."))
.settings(
organization := "example",
name := "example",
version := "0.0.1-SNAPSHOT",
scalaVersion := "2.12.8",
scalacOptions ++= Seq("-Ypartial-unification"),
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % ZioVersion,
),
addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.6"),
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.2.4")
)
scalacOptions ++= Seq(
"-deprecation", // Emit warning and location for usages of deprecated APIs.
"-encoding", "UTF-8", // Specify character encoding used by source files.
"-language:higherKinds", // Allow higher-kinded types
"-language:postfixOps", // Allows operator syntax in postfix position (deprecated since Scala 2.10)
"-feature", // Emit warning and location for usages of features that should be imported explicitly.
"-Ypartial-unification", // Enable partial unification in type constructor inference
"-Xfatal-warnings", // Fail the compilation if there are any warnings
)
In the Official Gitter Channel for ZIO Core, Adam Fraser suggested
You would want to have you environment just have a
Queue[String]
and then you would want to use a method likeprovideM
withQueue.unbounded
to create one queue and provide it to your whole application. That's whereprovideM
as opposed toprovide
comes in. It let's you satisfy an environment that requires anA
by providing aZIO[A]
.
A little digging into the ZIO source revealed a helpful example in DefaultTestReporterSpec.scala.
By defining the Environment as
trait MainEnv extends Console with Clock // environment with queue
{
val mainQueue: Queue[String]
}
changing my tasks to access env.mainQueue
with =
instead of <-
(because mainQueue is a Queue[String]
now and not a UIO[Queue[String]]
, removing runEnv
and changing the run
method in my test to use provideSomeM
def run(args: List[String]) =
program.provideSomeM(
for {
q <- Queue.unbounded[String]
} yield new MainEnv with Console.Live with Clock.Live {
override val mainQueue = q
}
).fold(_ => 1, _ => 0)
I was able to get the intended result:
sbt:example> run
[info] Running example.Main
env example.Main$$anon$1@45bfc0da queue zio.Queue$$anon$1@13b73d56
env example.Main$$anon$1@45bfc0da queue zio.Queue$$anon$1@13b73d56
Give me Coffee!
[success] Total time: 1 s, completed Oct 1, 2019 7:41:47 AM
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With