Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Actor how to only process the latest message

Tags:

scala

akka

Say I am sending messages to an Actor, when it is processing one message several more messages may arise. Now when it is ready to process the next message I want it to only process the latest message as the previous ones have become obsolete. How can I best achieve this?

Using the scala Actors library I was able to achieve this by first checking from my sender as follows:

if (myActor.getState != Runnable)
  myActor ! message     

But I don't think I can do such a test in the Akka system

like image 714
user79074 Avatar asked Jan 30 '14 11:01

user79074


People also ask

Can an Akka actor stop itself?

An actor can stop itself by returning. Behaviors. stopped as the next behavior. A child actor can be forced to stop after it finishes processing its current message by using the stop method of the ActorContext from the parent actor.

Are Akka actors single threaded?

Behind the scenes Akka will run sets of actors on sets of real threads, where typically many actors share one thread, and subsequent invocations of one actor may end up being processed on different threads.

Can an Akka actor stop other actors?

In Akka, you can stop Actors by invoking the stop() method of either ActorContext or ActorSystem class. ActorContext is used to stop child actor and ActorSystem is used to stop top level Actor.


2 Answers

There is no need to implement your own mailbox. At all.

Removed a lot of text and let this piece of code speak for itself:

// Either implement "equals" so that every job is unique (by default) or do another comparison in the match.
class Work 
case class DoWork(work: Work)

class WorkerActor extends Actor {
  // Left as an exercise for the reader, it clearly should do some work.
  def perform(work: Work): Unit = ()

  def lookingForWork: Receive = {
    case w: Work =>
      self forward DoWork(w)
      context become prepareToDoWork(w)
  }

  def prepareToDoWork(work: Work): Receive = {
    case DoWork(`work`) =>
      // No new work, so perform this one
      perform(work)
      // Now we're ready to look for new work
      context become lookingForWork
    case DoWork(_) =>
      // Discard work that we don't need to do anymore
    case w2: Work =>
      // Prepare to do this newer work instead
      context become prepareToDoWork(w2) 
  }

  //We start out as looking for work
  def receive = lookingForWork
}

This means that work will only be performed if there is no newer work in the mailbox.

like image 188
Viktor Klang Avatar answered Oct 06 '22 00:10

Viktor Klang


You could implement your own mailbox, this approach will not affect your actor implementation. See this answer for solution with changes in actor implementation instead of custom mailbox implementation.

Implementation of mailbox that drops old messages on enqueue:

package akka.actor.test 

import akka.actor.{ ActorRef, ActorSystem }
import com.typesafe.config.Config
import akka.dispatch.{Envelope, MessageQueue}

class SingleMessageMailbox extends akka.dispatch.MailboxType {

  // This constructor signature must exist, it will be called by Akka
  def this(settings: ActorSystem.Settings, config: Config) = this()

  // The create method is called to create the MessageQueue
  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new MessageQueue {
      val message = new java.util.concurrent.atomic.AtomicReference[Envelope]

      final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit =
        Option(message.get) foreach {deadLetters.enqueue(owner, _)}

      def enqueue(receiver: ActorRef, handle: Envelope): Unit =
        for {e <- Option(message.getAndSet(handle))} 
          receiver.asInstanceOf[InternalActorRef].
            provider.deadLetters.
            tell(DeadLetter(e.message, e.sender, receiver), e.sender)

      def dequeue(): Envelope = message.getAndSet(null)

      def numberOfMessages: Int = Option(message.get).size

      def hasMessages: Boolean = message.get != null
    }
}

Note that I have to add this class into package akka.actor to send old message to dead letters using InternalActorRef like implemented for BoundedQueueBasedMessageQueue.

If you want to just skip old messages you could implement enqueue like this:

def enqueue(receiver: ActorRef, handle: Envelope): Unit = message.set(handle)

Usage:

object Test extends App {
  import akka.actor._
  import com.typesafe.config.ConfigFactory

  // you should use your config file instead of ConfigFactory.parseString
  val actorSystem: ActorSystem =
    ActorSystem("default", ConfigFactory.parseString(
"""
  akka.daemonic=on
  myMailbox.mailbox-type = "akka.actor.test.SingleMessageMailbox"
"""))

  class EchoActor extends Actor {
    def receive = {
      case m => println(m); Thread.sleep(500)
    }
  }

  val actor = actorSystem.actorOf(Props[EchoActor].withMailbox("myMailbox"))

  for {i <- 1 to 10} {
    actor ! i
    Thread.sleep(100)
  }

  Thread.sleep(1000)

}

Test:

$ sbt run
1
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
5
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
10

See also akka/Mailboxes.

like image 30
senia Avatar answered Oct 05 '22 23:10

senia