Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What's the Akka-typed equivalent to pipeTo?

I'm currently trying to rewrite an existing untyped actor into a typed one. Since the actor is talking to a MySQL database using ScalikeJDBC, and since I'd like to have that done asynchronously, I'm dealing with Futures coming out of a separate (non-actor) repository class.

With untyped Akka, in an actor's receive method, I could do this:

import akka.pattern.pipe
val horseList : Future[Seq[Horse]] = horseRepository.listHorses(...)
horseList pipeTo sender()

And the sender actor would eventually receive a list of horses. I can't figure out how to do this inside a Behaviour, like:

val behaviour : Behavior[ListHorses] = Behaviors.receive { 
    (ctx,msg) => msg match {
        case ListHorses(replyTo) => 
            val horseListF : Future[Seq[Horse]] = horseRepository.listHorses(...)
            // -> how do I make horseListF's content end up at replyTo? <-
            Behaviors.same
    }
}

The pipe pattern doesn't work (as it expects an untyped ActorRef), and so far I haven't found anything else in the akka-actor-typed (2.5.12) dependency I'm using to make this work.

How do I do this?

like image 507
1flx Avatar asked May 03 '18 18:05

1flx


2 Answers

In Akka 2.5.22 (maybe earlier) there is context.pipeToSelf:

  def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit

You still have to provide a pattern match for Success and Failure, which in my code I've reduced with this sugar:

def mapPipe[A, T](success: A => T, failure: Throwable => T): Try[A] => T = {
  case Success(value) => success(value)
  case Failure(e) => failure(e)
}

Resulting in a call like this:

case class Horses(horses: Seq[Horse]) extends Command
case class HorseFailure(e: Throwable) extends Command

...

context.pipeToSelf(horseList) {
  mapPipe(Horses,HorseFailure)
}
like image 161
Arne Claassen Avatar answered Sep 19 '22 13:09

Arne Claassen


You can simply send a message to replyTo when the future completes successfully:

case ListHorses(replyTo) => 
    horseRepository.listHorses(...) foreach { horses => replyTo ! horses }
    Behaviors.same

Or if you want to handle errors as well:

case ListHorses(replyTo) =>
    horseRepository.listHorses(...) onComplete { 
        case Success(horses) => replyTo ! horses
        case Failure(e) => // error handling 
    }
    Behaviors.same

In order for this to work, you need an ExecutionContext. It usually makes sense to use the same one as the actor, so you will have to make it available to onComplete or foreach first:

implicit val ec = ctx.executionContext
like image 28
lex82 Avatar answered Sep 18 '22 13:09

lex82