Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Dead Letters with Ask Pattern

Tags:

scala

akka

actor

I apologize in advance if this seems at all confusing, as I'm dumping quite a bit here. Basically, I have a small service grabbing some Json, parsing and extracting it to case class(es), then writing it to a database. This service needs to run on a schedule, which is being handled well by an Akka scheduler. My database doesn't like when Slick tries to ask for a new AutoInc id at the same time, so I built in an Await.result to block that from happening. All of this works quite well, but my issue starts here: there are 7 of these services running, so I would like to block each one using a similar Await.result system. Every time I try to send the end time of the request back as a response (at the end of the else block), it gets sent to dead letters instead of to the Distributor. Basically: why does sender ! time go to dead letters and not to Distributor. This is a long question for a simple problem, but that's how development goes...

ClickActor.scala

    import java.text.SimpleDateFormat
    import java.util.Date
    import Message._
    import akka.actor.{Actor, ActorLogging, Props}
    import akka.util.Timeout
    import com.typesafe.config.ConfigFactory
    import net.liftweb.json._
    import spray.client.pipelining._
    import spray.http.{BasicHttpCredentials, HttpRequest, HttpResponse, Uri}
    import akka.pattern.ask
    import scala.concurrent.{Await, Future}
    import scala.concurrent.duration._

case class ClickData(recipient : String, geolocation : Geolocation, tags : Array[String],
                     url : String, timestamp : Double, campaigns : Array[String],
                     `user-variables` : JObject, ip : String,
                     `client-info` : ClientInfo, message : ClickedMessage, event : String)
  case class Geolocation(city : String, region : String, country : String)
  case class ClientInfo(`client-name`: String, `client-os`: String, `user-agent`: String,
                      `device-type`: String, `client-type`: String)
  case class ClickedMessage(headers : ClickHeaders)
    case class ClickHeaders(`message-id` : String)

class ClickActor extends Actor with ActorLogging{

  implicit val formats = DefaultFormats
  implicit val timeout = new Timeout(3 minutes)
  import context.dispatcher

  val con = ConfigFactory.load("connection.conf")
  val countries = ConfigFactory.load("country.conf")
  val regions = ConfigFactory.load("region.conf")

  val df = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss -0000")
  var time = System.currentTimeMillis()
  var begin = new Date(time - (12 hours).toMillis)
  var end = new Date(time)

  val pipeline : HttpRequest => Future[HttpResponse] = (
    addCredentials(BasicHttpCredentials("api", con.getString("mailgun.key")))
      ~> sendReceive
    )

  def get(lastrun : Long): Future[String] = {

    if(lastrun != 0) {
      begin = new Date(lastrun)
      end = new Date(time)
    }

    val uri = Uri(con.getString("mailgun.uri")) withQuery("begin" -> df.format(begin), "end" -> df.format(end),
      "ascending" -> "yes", "limit" -> "100", "pretty" -> "yes", "event" -> "clicked")
    val request = Get(uri)
    val futureResponse = pipeline(request)
    return futureResponse.map(_.entity.asString)
  }

  def receive = {
    case lastrun : Long => {
      val start = System.currentTimeMillis()
      val responseFuture = get(lastrun)
      responseFuture.onSuccess {
        case payload: String => val json = parse(payload)
          //println(pretty(render(json)))
          val elements = (json \\ "items").children
          if (elements.length == 0) {
            log.info("[ClickActor: " + this.hashCode() + "] did not find new events between " +
              begin.toString + " and " + end.toString)
            sender ! time
            context.stop(self)
          }
          else {
            for (item <- elements) {
              val data = item.extract[ClickData]
              var tags = ""
              if (data.tags.length != 0) {
                for (tag <- data.tags)
                  tags += (tag + ", ")
              }
              var campaigns = ""
              if (data.campaigns.length != 0) {
                for (campaign <- data.campaigns)
                  campaigns += (campaign + ", ")
              }
              val timestamp = (data.timestamp * 1000).toLong
              val msg = new ClickMessage(
                data.recipient, data.geolocation.city,
                regions.getString(data.geolocation.country + "." + data.geolocation.region),
                countries.getString(data.geolocation.country), tags, data.url, timestamp,
                campaigns, data.ip, data.`client-info`.`client-name`,
                data.`client-info`.`client-os`, data.`client-info`.`user-agent`,
                data.`client-info`.`device-type`, data.`client-info`.`client-type`,
                data.message.headers.`message-id`, data.event, compactRender(item))
              val csqla = context.actorOf(Props[ClickSQLActor])
              val future = csqla.ask(msg)
              val result = Await.result(future, timeout.duration).asInstanceOf[Int]
              if (result == 1) {
                log.error("[ClickSQLActor: " + csqla.hashCode() + "] shutting down due to lack of system environment variables")
                context.stop(csqla)
              }
              else if(result == 0) {
                log.info("[ClickSQLActor: " + csqla.hashCode() + "] successfully wrote to the DB")
              }
            }
            sender ! time
            log.info("[ClickActor: " + this.hashCode() + "] processed |" + elements.length + "| new events in " +
              (System.currentTimeMillis() - start) + " ms")
          }
      }
    }
  }
}

Distributor.scala

import akka.actor.{Props, ActorSystem}
import akka.event.Logging
import akka.util.Timeout
import akka.pattern.ask
import scala.concurrent.duration._
import scala.concurrent.Await

class Distributor {

  implicit val timeout = new Timeout(10 minutes)
  var lastClick : Long = 0

  def distribute(system : ActorSystem) = {
    val log = Logging(system, getClass)

    val clickFuture = (system.actorOf(Props[ClickActor]) ? lastClick)
    lastClick = Await.result(clickFuture, timeout.duration).asInstanceOf[Long]
    log.info(lastClick.toString)

    //repeat process with other events (open, unsub, etc)
  }
}
like image 283
Brian Bagdasarian Avatar asked Aug 20 '14 10:08

Brian Bagdasarian


People also ask

What is Akka dead letter?

What Are Dead Letters? In Akka, messages that can't be delivered are routed to a synthetic actor which has the path “/deadLetters”. This is for NON transport lost messages. Akka makes no guarantees for lost messages at the transport layer.

What is the ask Pattern?

In Akka, ask is a pattern and involves Actors as well as Futures. Ask is used to sends a message asynchronously and it returns a Future which represents a possible reply. If the actor does not reply and complete the future, it will expire after the timeout period. After the timeout period, it throws a TimeoutException.

In which message delivery guarantee approach each message is guaranteed delivery to at least one of the components that retrieve messages from the queue?

at-least-once delivery means that for each message handed to the mechanism potentially multiple attempts are made at delivering it, such that at least one succeeds; again, in more casual terms this means that messages may be duplicated but not lost.

What is Akka typed?

Akka “Typed Actors”, now replaced by Akka Typed, were an implementation of the Active Objects pattern. Essentially turning method invocations into asynchronous dispatch instead of synchronous that has been the default way since Smalltalk came out.


1 Answers

The reason is because the value of 'sender' (which is a method that retrieves the value) is no longer valid after leaving the receive block, yet the future that is being used in the above example will still be running and by the time that it finishes the actor will have left the receive block and bang; an invalid sender results in the message going to the dead letter queue.

The fix is to either not use a future, or when combining futures, actors and sender then capture the value of sender before you trigger the future.

val s = sender

val responseFuture = get(lastrun)
    responseFuture.onSuccess {    
    ....
    s ! time
}
like image 102
Chris K Avatar answered Oct 15 '22 01:10

Chris K