Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka non blocking options when an HTTP response is requied

I understand how to make a message based non-blocking application in akka, and can easily mock up examples that perform concurrent operations and pass back the aggregated results in a message. Where I have difficulty is understanding what my non-blocking options are when my application has to respond to an HTTP request. The goal is to receive a request and immediately hand it over to a local or remote actor to do the work, which in turn will hand it off to get a result that could take some time. Unfortunatly under this model, I don't understand how I could express this with a non-blocking series of "tells" rather than blocking "asks". If at any point in the chain I use a tell, I no longer have a future to use as the eventual response content (required by the http framework interface which in this case is finagle - but that is not important). I understand the request is on its own thread, and my example is quite contrived, but just trying to understand my design options.

In summary, If my contrived example below can be reworked to block less I very much love to understand how. This is my first use of akka since some light exploration a year+ ago, and in every article, document, and talk I have viewed says not to block for services.

Conceptual answers may be helpful but may also be the same as what I have already read. Working/Editing my example would likely be key to my understanding of the exact problem I am attempting to solve. If the current example is generally what needs to be done that confirmation is helpful too, so I don't search for magic that does not exist.

Note The following aliases: import com.twitter.util.{Future => TwitterFuture, Await => TwitterAwait}

object Server {

  val system = ActorSystem("Example-System")

  implicit val timeout = Timeout(1 seconds)

  implicit def scalaFuture2twitterFuture[T](scFuture: Future[T]): TwitterFuture[T] = {
    val promise = TwitterPromise[T]
    scFuture onComplete {
      case Success(result)  ⇒ promise.setValue(result)
      case Failure(failure) ⇒ promise.setException(failure)
    }
    promise
  }

  val service = new Service[HttpRequest, HttpResponse] {
    def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match {
      case "/a/b/c" =>
        val w1 = system.actorOf(Props(new Worker1))

        val r = w1 ? "take work"

        val response: Future[HttpResponse] = r.mapTo[String].map { c =>
          val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
          resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8))
          resp
        }

        response
    }
  }

//val server = Http.serve(":8080", service); TwitterAwait.ready(server)

  class Worker1 extends Actor with ActorLogging {
    def receive = {
      case "take work" =>
        val w2 = context.actorOf(Props(new Worker2))
        pipe (w2 ? "do work") to sender
    }
  }

  class Worker2 extends Actor with ActorLogging {
    def receive = {
      case "do work" =>
        //Long operation...
        sender ! "The Work"
    }
  }

  def main(args: Array[String]) {
    val r = service.apply(
      com.twitter.finagle.http.Request("/a/b/c")
    )
    println(TwitterAwait.result(r).getContent.toString(CharsetUtil.UTF_8)) // prints The Work
  }
}

Thanks in advance for any guidance offered!

like image 824
Eric Avatar asked Aug 21 '13 18:08

Eric


1 Answers

You can avoid sending a future as a message by using the pipe pattern—i.e., in Worker1 you'd write:

pipe(w2 ? "do work") to sender

Instead of:

sender ! (w2 ? "do work")

Now r will be a Future[String] instead of a Future[Future[String]].


Update: the pipe solution above is a general way to avoid having your actor respond with a future. As Viktor points out in a comment below, in this case you can take your Worker1 out of the loop entirely by telling Worker2 to respond directly to the actor that it (Worker1) got the message from:

w2.tell("do work", sender)

This won't be an option if Worker1 is responsible for operating on the response from Worker2 in some way (by using map on w2 ? "do work", combining multiple futures with flatMap or a for-comprehension, etc.), but if that's not necessary, this version is cleaner and more efficient.


That kills one Await.result. You can get rid of the other by writing something like the following:

val response: Future[HttpResponse] = r.mapTo[String].map { c =>
  val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
  resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8))
  resp
}

Now you just need to turn this Future into a TwitterFuture. I can't tell you off the top of my head exactly how to do this, but it should be fairly trivial, and definitely doesn't require blocking.

like image 92
Travis Brown Avatar answered Oct 07 '22 21:10

Travis Brown