I have an asynchronous control-flow like the following:
ActorA ! DoA(dataA, callback1, callbackOnErrorA)
def callback1() = {
...
ActorB ! DoB(dataB, callback2, callbackOnErrorB)
}
def callback2() = {
ActorC ! DoC(dataC, callback3, callbackOnErrorC)
}
...
How would I divide this flow into several parts (continuations) and sequentially dispatch these to different actors (or threads/tasks) while maintaining the overall state?
Any hint appreciated, Thanks
I like to use scalaz.concurrent.Promise
. This example isn't exactly like the one in your question, but it gives you the idea.
object Async extends Application {
import scalaz._
import Scalaz._
import concurrent._
import concurrent.strategy._
import java.util.concurrent.{ExecutorService, Executors}
case class ResultA(resultb: ResultB, resulta: ResultC)
case class ResultB()
case class ResultC()
run
def run {
implicit val executor: ExecutorService = Executors.newFixedThreadPool(8)
import Executor.strategy
val promiseA = doA
println("waiting for results")
val a: ResultA = promiseA.get
println("got " + a)
executor.shutdown
}
def doA(implicit s: Strategy[Unit]): Promise[ResultA] = {
println("triggered A")
val b = doB
val c = doC
for {bb <- b; cc <- c} yield ResultA(bb, cc)
}
def doB(implicit s: Strategy[Unit]): Promise[ResultB] = {
println("triggered B")
promise { Thread.sleep(1000); println("returning B"); ResultB() }
}
def doC(implicit s: Strategy[Unit]): Promise[ResultC] = {
println("triggered C")
promise { Thread.sleep(1000); println("returning C"); ResultC() }
}
}
Output:
triggered A
triggered B
triggered C
waiting for results
returning B
returning C
got ResultA(ResultB(),ResultC())
You'll find an introduction to Scalaz concurrency in this presentation from Runar.
This approach isn't as flexible as Actors, but composes better and can't deadlock.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With