As the doc of akka explains, you should be able to gain the pipeTo
method on [[scala.concurrent.Future]] this way:
import akka.pattern.pipe
val future = ...
future pipeTo sender()
Unfortunately, I can't do that, i'm getting an error "can't resolve symbol pipeTo" in my IDE.
As a workaround, I had to use the syntax this way
pipe(future) pipeTo sender()
But it still disturb me to not figure out why (i'm quite newby in scala BTW). Thanks a lot to help understand this puzzle.
scala 2.12.2 akka 2.5.3
You need to have an implicit ExecutionContext
in scope, here is an example:
import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.pipe
import scala.concurrent.Future
// Get the implicit ExecutionContext from this import
import scala.concurrent.ExecutionContext.Implicits.global
object Hello extends App {
// Creating a simple actor
class MyActor extends Actor {
override def receive: Receive = {
case x => println(s"Received message: ${x.toString}")
}
}
// Create actor system
val system = ActorSystem("example")
val ref = system.actorOf(Props[MyActor], "actor")
// Create the future to pipe
val future: Future[Int] = Future(100)
// Test
future pipeTo ref
}
Console:
sbt run
[info] <stuff here>
[info] Running example.Hello
Received message: 100
The reason you have to do that is because pipeTo
is a instance function on a PipeableFuture
, and your regular Future
has to be "enhanced" to a PipeableFuture
. Here is the constructor for PipeableFuture
, note the implicit executionContext: ExecutionContext
parameter:
final class PipeableFuture[T](val future: Future[T])(implicit executionContext: ExecutionContext)
The full class is here, where you can see the pipeTo
function:
final class PipeableFuture[T](val future: Future[T])(implicit executionContext: ExecutionContext) {
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[T] = {
future andThen {
case Success(r) ⇒ recipient ! r
case Failure(f) ⇒ recipient ! Status.Failure(f)
}
}
def pipeToSelection(recipient: ActorSelection)(implicit sender: ActorRef = Actor.noSender): Future[T] = {
future andThen {
case Success(r) ⇒ recipient ! r
case Failure(f) ⇒ recipient ! Status.Failure(f)
}
}
def to(recipient: ActorRef): PipeableFuture[T] = to(recipient, Actor.noSender)
def to(recipient: ActorRef, sender: ActorRef): PipeableFuture[T] = {
pipeTo(recipient)(sender)
this
}
def to(recipient: ActorSelection): PipeableFuture[T] = to(recipient, Actor.noSender)
def to(recipient: ActorSelection, sender: ActorRef): PipeableFuture[T] = {
pipeToSelection(recipient)(sender)
this
}
}
Since pipe(future)
was not an instance function on a Future, it works in your example.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With