I need to publish messages of different types to event stream, and those messages should have different priorities for example, if 10 messages of type A have been posted, and one message of type B is posted after all, and priority of B is higher than the priority of A - message B should be picked up by next actor even if there are 10 messages of type A in queue.
I have read about prioritized messages here and created my simple implementation of that mailbox:
class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(
PriorityGenerator {
case ServerPermanentlyDead => println("Priority:0"); 0
case ServerDead => println("Priority:1"); 1
case _ => println("Default priority"); 10
}
)
then I configured it in application.conf
akka {
actor {
prio-dispatcher {
type = "Dispatcher"
mailbox-type = "mailbox.PrioritizedMailbox"
}
}
}
and wired into my actor:
private val myActor = actors.actorOf(
Props[MyEventHandler[T]].
withRouter(RoundRobinRouter(HIVE)).
withDispatcher("akka.actor.prio-dispatcher").
withCreator(
new Creator[Actor] {
def create() = new MyEventHandler(storage)
}), name = "eventHandler")
I'm using ActorSystem.eventStream.publish in order to send messages, and my actor is subscribed to it (I can see in logs that messages are processed, but in FIFO order).
However looks like it is not enough, because in logs/console I've never seen the messages like "Default priority". Am I missing something here? Does the described approach work with event streams or just with direct invocations of sending a message on actor? And how do I get prioritized messages with eventStream?
Companion object ActorSystem An actor system is a hierarchical group of actors which share common configuration, e.g. dispatchers, deployments, remote capabilities and addresses. It is also the entry point for creating or looking up actors. There are several possibilities for creating actors (see akka.
at-least-once delivery means that for each message handed to the mechanism potentially multiple attempts are made at delivering it, such that at least one succeeds; again, in more casual terms this means that messages may be duplicated but not lost.
Class DeadLetterWhen a message is sent to an Actor that is terminated before receiving the message, it will be sent as a DeadLetter to the ActorSystem's EventStream.
Each actor in Akka has a Mailbox , this is where the messages are enqueued before being processed by the actor.
Your problem is that your actors are insanely fast so messages get processed before they have time to queue up, so there cannot be any priorization done by the mailbox. The example below proves the point:
trait Foo
case object X extends Foo
case object Y extends Foo
case object Z extends Foo
class PrioritizedMailbox(settings: ActorSystem.Settings, cfg: Config)
extends UnboundedPriorityMailbox(
PriorityGenerator {
case X ⇒ 0
case Y ⇒ 1
case Z ⇒ 2
case _ ⇒ 10
})
val s = ActorSystem("prio", com.typesafe.config.ConfigFactory.parseString(
""" prio-dispatcher {
type = "Dispatcher"
mailbox-type = "%s"
}""".format(classOf[PrioritizedMailbox].getName)))
val latch = new java.util.concurrent.CountDownLatch(1)
val a = s.actorOf(Props(new akka.actor.Actor {
latch.await // Just wait here so that the messages are queued up
inside the mailbox
def receive = {
case any ⇒ /*println("Processing: " + any);*/ sender ! any
}
}).withDispatcher("prio-dispatcher"))
implicit val sender = testActor
a ! "pig"
a ! Y
a ! Z
a ! Y
a ! X
a ! Z
a ! X
a ! "dog"
latch.countDown()
Seq(X, X, Y, Y, Z, Z, "pig", "dog") foreach { x => expectMsg(x) }
s.shutdown()
This test passes with flying colors
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With