Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka/Scala: mapping Future vs pipeTo

Tags:

scala

akka

actor

In Akka actors, are there any differences - in terms of number of threads being used, or thread locking - between sending a Future result to another actor by:

A. mapping the Future to function that tell the result to the actor.

B. defining an onSuccess callback on the future, which tell the result to the actor.

C. piping the Future result to the actor with pipeTo.

Some of these options are discussed in a previous question:

Akka: Send a future message to an Actor

Which one of the three is the preferred way to do it, and why?

Also, I would like to know, if receive should be of type Any => Unit, then why does the code compile when in some cases the partial function of receive returns a Future, not Unit?

Here is a code example of the three options that I mentioned above:

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import akka.pattern.pipe

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Success

class ActorIncrement extends Actor {

  def receive = {
    case i: Int =>
      println(s"increment $i")
      sender ! i + 1
  }
}

class ActorEven extends Actor {

  def receive = {
    case i: Int =>
      println(s"$i is even")
  }
}


class ActorOdd extends Actor {

  def receive = {
    case i: Int =>
      println(s"$i is odd")
  }
}

class MyActor(actorIncrement: ActorRef, actorEven: ActorRef, actorOdd: ActorRef) extends Actor {
  import scala.concurrent.ExecutionContext.Implicits.global

  implicit val timeout = Timeout(5 seconds)

  def receive = {
    case i: Int if i % 2 == 0 =>
      println(s"receive a: $i")
      actorIncrement ? i map {
        case j: Int =>
          println(s"$j from increment a")
          actorOdd ! j
      }
    case i: Int =>
      println(s"receive b: $i")
      val future: Future[Any] = actorIncrement ? i
      future onSuccess {
        case i: Int =>
          println(s"$i from increment b")
          actorEven ! i
      }

    case s: String =>
      println(s"receive c: $s")
      (actorIncrement ? s.toInt).mapTo[Int] filter(_ % 2 == 0) andThen { case Success(i: Int) => println(s"$i from increment c") } pipeTo actorEven
  }
}

object TalkToActor extends App {

  // Create the 'talk-to-actor' actor system
  val system = ActorSystem("talk-to-actor")

  val actorIncrement = system.actorOf(Props[ActorIncrement], "actorIncrement")
  val actorEven = system.actorOf(Props[ActorEven], "actorEven")
  val actorOdd = system.actorOf(Props[ActorOdd], "actorOdd")

  val myActor = system.actorOf(Props(new MyActor(actorIncrement, actorEven, actorOdd)), "myActor")

  myActor ! 2
  myActor ! 7
  myActor ! "11"

  Thread.sleep(1000)

  //shutdown system
  system.terminate()
}
like image 437
rapt Avatar asked Dec 11 '17 05:12

rapt


People also ask

What is pipeTo in Akka?

Commonly Used Patterns With Akka ask: create a temporary one-off actor for receiving a reply to a message and complete a Future with it; returns said Future. pipeTo: feed eventually computed value of a future to an actor as a message.

What is future in Akka?

Introduction. In the Scala Standard Library, a Future is a data structure used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking) or asynchronously (non-blocking). To be able to use this from Java, Akka provides a java friendly interface in akka.

What is execution context in Akka?

An Akka MessageDispatcher is what makes Akka Actors “tick”, it is the engine of the machine so to speak. All MessageDispatcher implementations are also an ExecutionContext , which means that they can be used to execute arbitrary code, for instance Futures.

What is actor model in Akka?

Akka Actors The Actor Model provides a higher level of abstraction for writing concurrent and distributed systems. It alleviates the developer from having to deal with explicit locking and thread management, making it easier to write correct concurrent and parallel systems.


1 Answers

If you look at how pipeTo is defined in akka.pattern.PipeToSupport,

def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = 
  Actor.noSender): Future[T] = {
    future andThen {
      case Success(r) ⇒ recipient ! r
      case Failure(f) ⇒ recipient ! Status.Failure(f)
    }
  }
}

As you can see... pipeTo is nothing different than just adding andThen call to your Future which either sends the future-result or a Status.Failure message to the piped actor in case your Future fails.

Now the main difference lies in this Status.Failure failure-handling. If you are not using pipeTo, you can handle your failure in whatever way you want to.

like image 85
sarveshseri Avatar answered Sep 27 '22 20:09

sarveshseri