Say I am sending messages to an Actor, when it is processing one message several more messages may arise. Now when it is ready to process the next message I want it to only process the latest message as the previous ones have become obsolete. How can I best achieve this?
Using the scala Actors library I was able to achieve this by first checking from my sender as follows:
if (myActor.getState != Runnable)
myActor ! message
But I don't think I can do such a test in the Akka system
An actor can stop itself by returning. Behaviors. stopped as the next behavior. A child actor can be forced to stop after it finishes processing its current message by using the stop method of the ActorContext from the parent actor.
Behind the scenes Akka will run sets of actors on sets of real threads, where typically many actors share one thread, and subsequent invocations of one actor may end up being processed on different threads.
In Akka, you can stop Actors by invoking the stop() method of either ActorContext or ActorSystem class. ActorContext is used to stop child actor and ActorSystem is used to stop top level Actor.
There is no need to implement your own mailbox. At all.
Removed a lot of text and let this piece of code speak for itself:
// Either implement "equals" so that every job is unique (by default) or do another comparison in the match.
class Work
case class DoWork(work: Work)
class WorkerActor extends Actor {
// Left as an exercise for the reader, it clearly should do some work.
def perform(work: Work): Unit = ()
def lookingForWork: Receive = {
case w: Work =>
self forward DoWork(w)
context become prepareToDoWork(w)
}
def prepareToDoWork(work: Work): Receive = {
case DoWork(`work`) =>
// No new work, so perform this one
perform(work)
// Now we're ready to look for new work
context become lookingForWork
case DoWork(_) =>
// Discard work that we don't need to do anymore
case w2: Work =>
// Prepare to do this newer work instead
context become prepareToDoWork(w2)
}
//We start out as looking for work
def receive = lookingForWork
}
This means that work will only be performed if there is no newer work in the mailbox.
You could implement your own mailbox, this approach will not affect your actor implementation. See this answer for solution with changes in actor implementation instead of custom mailbox implementation.
Implementation of mailbox that drops old messages on enqueue
:
package akka.actor.test
import akka.actor.{ ActorRef, ActorSystem }
import com.typesafe.config.Config
import akka.dispatch.{Envelope, MessageQueue}
class SingleMessageMailbox extends akka.dispatch.MailboxType {
// This constructor signature must exist, it will be called by Akka
def this(settings: ActorSystem.Settings, config: Config) = this()
// The create method is called to create the MessageQueue
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new MessageQueue {
val message = new java.util.concurrent.atomic.AtomicReference[Envelope]
final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit =
Option(message.get) foreach {deadLetters.enqueue(owner, _)}
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
for {e <- Option(message.getAndSet(handle))}
receiver.asInstanceOf[InternalActorRef].
provider.deadLetters.
tell(DeadLetter(e.message, e.sender, receiver), e.sender)
def dequeue(): Envelope = message.getAndSet(null)
def numberOfMessages: Int = Option(message.get).size
def hasMessages: Boolean = message.get != null
}
}
Note that I have to add this class into package akka.actor
to send old message to dead letters using InternalActorRef
like implemented for BoundedQueueBasedMessageQueue
.
If you want to just skip old messages you could implement enqueue
like this:
def enqueue(receiver: ActorRef, handle: Envelope): Unit = message.set(handle)
Usage:
object Test extends App {
import akka.actor._
import com.typesafe.config.ConfigFactory
// you should use your config file instead of ConfigFactory.parseString
val actorSystem: ActorSystem =
ActorSystem("default", ConfigFactory.parseString(
"""
akka.daemonic=on
myMailbox.mailbox-type = "akka.actor.test.SingleMessageMailbox"
"""))
class EchoActor extends Actor {
def receive = {
case m => println(m); Thread.sleep(500)
}
}
val actor = actorSystem.actorOf(Props[EchoActor].withMailbox("myMailbox"))
for {i <- 1 to 10} {
actor ! i
Thread.sleep(100)
}
Thread.sleep(1000)
}
Test:
$ sbt run
1
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
5
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
10
See also akka/Mailboxes.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With