I'm trying to implement a Pub/Sub trait to mix into other akka actors using a stackable trait.
Here is what I came up with:
trait PubSubActor extends Actor {
abstract override def receive =
super.receive orElse {
case Subscribe(topic) => /* ... */
case Publish(topic, msg) => /* ... */
}
}
class MyActor extends Actor with PubSubActor {
override def receive = {
case SomeMessage(a, b, c) => /* ... */
}
}
At which point, the compiler throws back an error: error: overriding method receive in trait MyActor... method receive needs `abstract override' modifiers.
Can you explain to me why this isn't working? How can I fix it so it works?
Thanks!
UPDATE
The following works:
trait PubSubActor extends Actor {
abstract override def receive =
super.receive orElse {
case Subscribe(topic) => /* ... */
case Publish(topic, msg) => /* ... */
}
}
class MyActor extends Actor {
override def receive = {
case SomeMessage(a, b, c) => /* ... */
}
}
class MyActorImpl extends MyActor with PubSubActor
But why? Why can I get the behavior I want this way but not the other? Any reasons? I can't seem to figure out the underlying difference between these two samples that makes the difference.
There's a simple and concise solution:
Define a Receiving trait that chains multiple receive functions using orElse
:
trait Receiving {
var receivers: Receive = Actor.emptyBehavior
def receiver(next: Actor.Receive) { receivers = receivers orElse next }
def receive = receivers // Actor.receive definition
}
Using this in actors is easy:
trait PubSubActor extends Receiving {
receiver {
case Publish => /* I'm the first to handle messages */
}
}
class MyActor extends PubSubActor with Receiving {
receiver {
case SomeMessage => /* PubSubActor didn't handle, I receive the message */
}
}
First PubSubActor's receive will be called. If message wasn't handled it will be passed to MyActor's receive.
You can certainly achieve what you are looking for using Akka's composable actor feature. This is described a bit in Extending Actors using PartialFunction chaining.
First, the infrastructure code (straight from the docs):
class PartialFunctionBuilder[A, B] {
import scala.collection.immutable.Vector
// Abbreviate to make code fit
type PF = PartialFunction[A, B]
private var pfsOption: Option[Vector[PF]] = Some(Vector.empty)
private def mapPfs[C](f: Vector[PF] => (Option[Vector[PF]], C)): C = {
pfsOption.fold(throw new IllegalStateException("Already built"))(f) match {
case (newPfsOption, result) => {
pfsOption = newPfsOption
result
}
}
}
def +=(pf: PF): Unit =
mapPfs { case pfs => (Some(pfs :+ pf), ()) }
def result(): PF =
mapPfs { case pfs => (None, pfs.foldLeft[PF](Map.empty) { _ orElse _ }) }
}
trait ComposableActor extends Actor {
protected lazy val receiveBuilder = new PartialFunctionBuilder[Any, Unit]
final def receive = receiveBuilder.result()
}
Then the behaviors you want to be able to compose into actors:
trait PubSubActor { self:ComposableActor =>
receiveBuilder += {
case Subscribe(topic) => /* ... */
case Publish(topic, msg) => /* ... */
}
}
trait MyActor { self:ComposableActor =>
receiveBuilder += {
case SomeMessage(a, b, c) => /* ... */
}
}
And lastly, an actual actor that uses these composable behaviors:
class MyActorImpl extends ComposableActor with PubSubActor with MyActor
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With