Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Actor - wait for some time to expect a message, otherwise send a message out

Tags:

scala

akka

actor

Is it possible to make an Actor wait for X amount of seconds to receive any message, and if a message is received, process it as usual, otherwise send a message to some other Actor (pre-determined in the constructor)?

like image 825
adelbertc Avatar asked Oct 05 '12 06:10

adelbertc


3 Answers

It's possible, have a look at Akka Actor "ask" and "Await" with TimeoutException. But keep in mind that blocking inside an actor is a very bad idea since during that time actor can't handle any other messages. Moreover it blocks one Akka processing thread.

A better approach is to send a message (fire and forget) and schedule some timeout event using Akka scheduler. When the response arrives, cancel that event or set some flag so that it won't trigger if the reply actually came on time.

like image 153
Tomasz Nurkiewicz Avatar answered Nov 01 '22 18:11

Tomasz Nurkiewicz


Yes, if you want to wait for any message, you simply set a receiveTimeout: http://doc.akka.io/docs/akka/current/scala/actors.html#receive-timeout

(The docs is slightly misleading here, you can set the receiveTimeout after every message also)

like image 5
Viktor Klang Avatar answered Nov 01 '22 19:11

Viktor Klang


Might be an overkill, but you might check out the Finite State Machine (FSM) trait.

import akka._
import actor._
import util._
import duration._
import Impatient._

object Impatient {
  sealed trait State
  case object WaitingForMessage extends State
  case object MessageReceived extends State
  case object TimeoutExpired extends State

  sealed trait Data
  case object Unitialized extends Data

  // In
  case object Message
}

class Impatient(receiver: ActorRef) extends Actor with FSM[State, Data] {
  startWith(WaitingForMessage, Unitialized)

  when(WaitingForMessage, stateTimeout = 3 seconds) {
    case Event(StateTimeout, data) => goto(TimeoutExpired) using data // data is usually modified here
    case Event(Message, data) => goto(MessageReceived) using data // data is usually modified here
  }

  onTransition {
    case WaitingForMessage -> MessageReceived => stateData match {
      case data => log.info("Received message: " + data)
    }
    case WaitingForMessage -> TimeoutExpired => receiver ! TimeoutExpired
  }

  when(MessageReceived) {
    case _ => stay
  }

  when(TimeoutExpired) {
    case _ => stay
  }

  initialize
}

Here it is in action:

object Main extends App {
  import akka._
  import actor._
  import Impatient._

  val system = ActorSystem("System")

  val receiver = system.actorOf(Props(new Actor with ActorLogging {
    def receive = {
      case TimeoutExpired => log.warning("Timeout expired")
    }
  }))

  val impatient = system.actorOf(Props(new Impatient(receiver)), name = "Impatient")
  impatient ! Message

  val impatient2 = system.actorOf(Props(new Impatient(receiver)), name = "Impatient2")
  Thread.sleep(4000)
  impatient2 ! Message

  system.shutdown()
}
like image 4
agilesteel Avatar answered Nov 01 '22 18:11

agilesteel