Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spray Akka Json Unmarshalling

I've a problem about unmarshalling objects to Json via using spray - akka.

When i'd like to use actors that returns Future[List[Person]] , it doesn't work.

If i use dao object directly, it works.

Here are my codes:

PersonDao.scala

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

case class Person(id: Int, name: String, surname: String)

object PersonDao {

  def getAll: Future[List[Person]] = Future {
    List[Person](Person(1, "Bilal", "Alp"), Person(2, "Ahmet", "Alp"))
  }
}

EntityServiceActor.scala

import akka.actor.Actor
import com.bilalalp.akkakafka.model.PersonDao
import com.bilalalp.akkakafka.service.ServiceOperation.FIND_ALL

object ServiceOperation {

  case object FIND_ALL

}

class EntityServiceActor extends Actor {

  override def receive: Receive = {

    case FIND_ALL => PersonDao.getAll
  }
}

ServerSupervisor.scala

import akka.actor.{Actor, ActorRefFactory}
import com.bilalalp.akkakafka.webservice.TaskWebService
import spray.routing.RejectionHandler.Default


class ServerSupervisor extends Actor with PersonWebService {

  implicit val system = context.system

  override def receive: Receive = runRoute(entityServiceRoutes)

  override implicit def actorRefFactory: ActorRefFactory = context
}

WebServiceTrait.scala

import akka.util.Timeout

import spray.routing.HttpService

import scala.concurrent.duration._
import scala.language.postfixOps

import org.json4s.NoTypeHints
import org.json4s.native.Serialization._

trait WebServiceTrait extends HttpService {

  implicit def executionContext = actorRefFactory.dispatcher

  implicit val json4sFormats = formats(NoTypeHints)

  implicit val timeout = Timeout(120 seconds)
}

PersonWebService.scala

trait PersonWebService extends WebServiceTrait with Json4sSupport {

  val json3sFormats = DefaultFormats

  val entityServiceWorker = actorRefFactory.actorOf(Props[EntityServiceActor], "entityServiceActor")

  val entityServiceRoutes = {
    pathPrefix("person") {
      pathEndOrSingleSlash {
        get {
          ctx => ctx.complete((entityServiceWorker ? FIND_ALL).mapTo[Person])
        }
      }
    }
  }
}

Application.scala

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.io.IO
import com.bilalalp.akkakafka.server.ServerSupervisor
import spray.can.Http


object Application extends App {

  implicit val system = ActorSystem("actorSystem")

  val mainHandler: ActorRef = system.actorOf(Props[ServerSupervisor])
  IO(Http)! Http.Bind(mainHandler, interface = Configuration.appInterface, port = Configuration.appPort)

}

When i run this code, It gives nothing and waits for a while.

After waiting browser gives this message:

The server was not able to produce a timely response to your request.

And console output is

[ERROR] [11/22/2015 21:15:24.109] [actorSystem-akka.actor.default-dispatcher-7] [akka.actor.ActorSystemImpl(actorSystem)] Error during processing of request HttpRequest(GET,http://localhost:3001/person/,List(Host: localhost:3001, Connection: keep-alive, Cache-C ontrol: no-cache, Pragma: no-cache, User-Agent: Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Maxthon/4.4.6.1000 Chrome/30.0.1599.101 Safari/537.36, DNT: 1, Accept-Encoding: gzip, deflate, Accept-Language: tr-TR),Empty,HTTP/1.1) akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://actorSystem/user/$a/entityServiceActor#-1810673919]] after [120000 ms]. Sender[null] sent message of type "com.bilalalp.akkakafka.service.ServiceOperation$FIND_ALL$". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:415) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:132) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)

If i change PersonWebService.scala to this :

trait PersonWebService extends WebServiceTrait with Json4sSupport {

  val json3sFormats = DefaultFormats

  val entityServiceWorker = actorRefFactory.actorOf(Props[EntityServiceActor], "entityServiceActor")

  val entityServiceRoutes = {
    pathPrefix("person") {
      pathEndOrSingleSlash {
        get (
//                    ctx => ctx.complete((entityServiceWorker ? FIND_ALL).mapTo[Person])
          ctx => ctx.complete(PersonDao getAll)
        )
      }
    }
  }
}

It works and output is :

[{"id":1,"name":"Bilal","surname":"Alp"},{"id":2,"name":"Ahmet","surname":"Alp"}]

I'd like to use actors in spray routes. I don't know whether it is a bad practice or not because i'm newbie in akka and spray.

How can i solve this? Any ideas?

Thank you.

like image 718
Bilal ALP Avatar asked Nov 22 '15 19:11

Bilal ALP


2 Answers

First of all, You can type (PersonWebService.scala):

pathEndOrSingleSlash {
    get {
      complete {
       (entityServiceWorker ? FindAll).mapTo[List[Person]]
    }
  }

And as @Timothy Kim said You need to send back results using "sender ! getAll.onComplete

As I an see getAll returns Future, so in my opinion best would be to resolve it in EntityServiceActor.scala:

// import the pipe pattern (see pipeTo below):
import akka.pattern.pipe
import context.dispatcher

override def receive: Receive = {
  case FindAll => 
    PersonDao.getAll()
      .recover({ case err => List() /* could log error here */ })
      .pipeTo(sender()) // do this instead of onComplete, it's safer

in this simple case getAll Future is resolved, if everything is ok, service will get list of persons, otherwise List will be empty.

Oh and another thing PersonWebService.scala should have .mapTo[List[Person]]

like image 58
darek Avatar answered Oct 21 '22 12:10

darek


You need to send a result back to sender:

case FIND_ALL =>
  PersonDao.getAll.pipeTo(sender())
like image 3
Timothy Klim Avatar answered Oct 21 '22 12:10

Timothy Klim