It was me being stupid - I wasn't passing the indexer props into system creation. I'll leave the answer here in case anyone takes some benefit *
I'm creating a singleton and sending a message like this:
val indexerProps = ClusterSingletonManager.props(had => Props(
classOf[SingleCoreIndexer], dataProvider, publisher, name), name, End, None)
val coreIndexer = system.actorOf(indexerProps, name)
//val coreIndexer = system.actorOf(Props(classOf[SingleCoreIndexer], dataProvider, publisher, name))
coreIndexer ! "start_indexing"
The commented out line shows the non-singleton props that work fine
When I run the app I get the following errors:
[WARN] [06/21/2013 11:55:32.443] [deadcoreindexerstest-akka.actor.default-dispatcher-5] [akka://deadcoreindexerstest/user/node1] unhandled event start_indexing in state Start
All of the other functionality stops working, which correlates with the message implying the "coreIndexer" actor is not getting the "start_indexing" message
More code:
class Indexer(systemCreator: SystemCreator, publisherProps: Props, dataProviderProps: Props, name: String) {
def start {
val system = systemCreator.create
val dataProvider = system.actorOf(dataProviderProps)
val publisher = system.actorOf(publisherProps)
val indexerProps = ClusterSingletonManager.props(
singletonProps = had => Props(classOf[SingleCoreIndexer], dataProvider, publisher, name),
singletonName = "aaa",
terminationMessage = End,
role = None
)
val coreIndexer = system.actorOf(Props(classOf[SingleCoreIndexer], dataProvider, publisher, name))
coreIndexer ! "start_indexing"
}
}
class SingleCoreIndexer(dataProvider: ActorRef, publisher: ActorRef, name: String) extends Actor {
def receive = {
case "start_indexing" => {
println("Single core indexer starting indexing")
dataProvider ! new NextBatchOfDataPlease
}
case BatchOfData(data) => {
publisher ! (name, data)
self ! "next_batch"
}
case "next_batch" => {
dataProvider ! new NextBatchOfDataPlease
}
}
}
It looks like I was sending a message to the manager and not the singleton. However, when I send messages to the singleton, nothing happens:
class Indexer(systemCreator: SystemCreator, publisherProps: Props, dataProviderProps: Props, name: String) {
def start {
val system = systemCreator.create
val dataProvider = system.actorOf(dataProviderProps)
val publisher = system.actorOf(publisherProps)
val indexerProps = ClusterSingletonManager.props(
singletonProps = had => Props(classOf[SingleCoreIndexer], dataProvider, publisher, name),
singletonName = "singlecoreindexer",
terminationMessage = End,
role = None
)
system.actorOf(Props(classOf[SingleCoreIndexer], dataProvider, publisher, name))
val coreIndexer = system.actorSelection(s"/user/$name/singlecoreindexer")
coreIndexer ! "start_indexing"
}
}
I know this is already resolved, but it even with the info provided here, it still took me a while longer to figure out how to send messages to the singleton and figured I'd leave what I found here
The two key concepts not well explained by the Cluster Singleton Documentation are that:
ClusterSingletonManager.props
is the parent of the actual instance, andClusterSingletonProxy
Each node in the cluster will create the singleton manager and the one that finally wins out as being the oldest is the parent of the singleton you actually want to talk to. ClusterSingletonProxy
ensures that you are talking to the actual proxy and that even if the singleton is temporarily unavailable or migrates to another node, you always are talking to the proper instance.
Given that information, the code should be:
class Indexer(systemCreator: SystemCreator, publisherProps: Props, dataProviderProps: Props, name: String) {
def start {
val system = systemCreator.create
val dataProvider = system.actorOf(dataProviderProps)
val publisher = system.actorOf(publisherProps)
val indexerProps = ClusterSingletonManager.props(
singletonProps = had => Props(classOf[SingleCoreIndexer], dataProvider, publisher, name),
singletonName = "singlecoreindexer",
terminationMessage = End,
role = None
)
val singletonManager = system.actorOf(
Props(classOf[SingleCoreIndexer],dataProvider, publisher, name)
)
val indexerPath = (singletonManager.path / name)
val coreIndexer = system.actorOf(
ClusterSingletonProxy.props(indexerPath, None),
s"$name-proxy"
)
coreIndexer ! "start_indexing"
}
}
The issue you are seeing (I think) stems from you sending a message to the ClusterSingletonManager
instead of to your actual actor that is sitting underneath it. Try looking up the actor underneath if by name (actorFor
) and it should work.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With