I'm currently trying to rewrite an existing untyped actor into a typed one. Since the actor is talking to a MySQL database using ScalikeJDBC, and since I'd like to have that done asynchronously, I'm dealing with Futures coming out of a separate (non-actor) repository class.
With untyped Akka, in an actor's receive method, I could do this:
import akka.pattern.pipe
val horseList : Future[Seq[Horse]] = horseRepository.listHorses(...)
horseList pipeTo sender()
And the sender actor would eventually receive a list of horses. I can't figure out how to do this inside a Behaviour, like:
val behaviour : Behavior[ListHorses] = Behaviors.receive {
(ctx,msg) => msg match {
case ListHorses(replyTo) =>
val horseListF : Future[Seq[Horse]] = horseRepository.listHorses(...)
// -> how do I make horseListF's content end up at replyTo? <-
Behaviors.same
}
}
The pipe pattern doesn't work (as it expects an untyped ActorRef), and so far I haven't found anything else in the akka-actor-typed
(2.5.12) dependency I'm using to make this work.
How do I do this?
In Akka 2.5.22 (maybe earlier) there is context.pipeToSelf
:
def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit
You still have to provide a pattern match for Success
and Failure
, which in my code I've reduced with this sugar:
def mapPipe[A, T](success: A => T, failure: Throwable => T): Try[A] => T = {
case Success(value) => success(value)
case Failure(e) => failure(e)
}
Resulting in a call like this:
case class Horses(horses: Seq[Horse]) extends Command
case class HorseFailure(e: Throwable) extends Command
...
context.pipeToSelf(horseList) {
mapPipe(Horses,HorseFailure)
}
You can simply send a message to replyTo
when the future completes successfully:
case ListHorses(replyTo) =>
horseRepository.listHorses(...) foreach { horses => replyTo ! horses }
Behaviors.same
Or if you want to handle errors as well:
case ListHorses(replyTo) =>
horseRepository.listHorses(...) onComplete {
case Success(horses) => replyTo ! horses
case Failure(e) => // error handling
}
Behaviors.same
In order for this to work, you need an ExecutionContext
. It usually makes sense to use the same one as the actor, so you will have to make it available to onComplete
or foreach
first:
implicit val ec = ctx.executionContext
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With