Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Singleton - not accepting messages

It was me being stupid - I wasn't passing the indexer props into system creation. I'll leave the answer here in case anyone takes some benefit *

I'm creating a singleton and sending a message like this:

 val indexerProps = ClusterSingletonManager.props(had => Props(
            classOf[SingleCoreIndexer], dataProvider, publisher, name), name, End, None)

        val coreIndexer = system.actorOf(indexerProps, name)
        //val coreIndexer = system.actorOf(Props(classOf[SingleCoreIndexer], dataProvider, publisher, name))

        coreIndexer ! "start_indexing"

The commented out line shows the non-singleton props that work fine

When I run the app I get the following errors:

[WARN] [06/21/2013 11:55:32.443] [deadcoreindexerstest-akka.actor.default-dispatcher-5] [akka://deadcoreindexerstest/user/node1] unhandled event start_indexing in state Start

All of the other functionality stops working, which correlates with the message implying the "coreIndexer" actor is not getting the "start_indexing" message

More code:

class Indexer(systemCreator: SystemCreator, publisherProps: Props, dataProviderProps: Props, name: String) {

    def start {
        val system = systemCreator.create
        val dataProvider = system.actorOf(dataProviderProps)
        val publisher = system.actorOf(publisherProps)

        val indexerProps = ClusterSingletonManager.props(
            singletonProps = had => Props(classOf[SingleCoreIndexer], dataProvider, publisher, name),
            singletonName = "aaa",
            terminationMessage = End,
            role = None
            )

        val coreIndexer = system.actorOf(Props(classOf[SingleCoreIndexer], dataProvider, publisher, name))
        coreIndexer ! "start_indexing"
    }
}



class SingleCoreIndexer(dataProvider: ActorRef, publisher: ActorRef, name: String) extends Actor {

    def receive = {

        case "start_indexing" => {
            println("Single core indexer starting indexing")
            dataProvider ! new NextBatchOfDataPlease
        }

        case BatchOfData(data) => {
            publisher ! (name, data)
            self ! "next_batch"
        }

        case "next_batch" => {
            dataProvider ! new NextBatchOfDataPlease
        }
    }
}

It looks like I was sending a message to the manager and not the singleton. However, when I send messages to the singleton, nothing happens:

class Indexer(systemCreator: SystemCreator, publisherProps: Props, dataProviderProps: Props, name: String) {

    def start {
        val system = systemCreator.create
        val dataProvider = system.actorOf(dataProviderProps)
        val publisher = system.actorOf(publisherProps)

        val indexerProps = ClusterSingletonManager.props(
            singletonProps = had => Props(classOf[SingleCoreIndexer], dataProvider, publisher, name),
            singletonName = "singlecoreindexer",
            terminationMessage = End,
            role = None
            )

        system.actorOf(Props(classOf[SingleCoreIndexer], dataProvider, publisher, name))
        val coreIndexer = system.actorSelection(s"/user/$name/singlecoreindexer")
        coreIndexer ! "start_indexing"
    }
}
like image 933
Nick Avatar asked Jun 21 '13 11:06

Nick


2 Answers

I know this is already resolved, but it even with the info provided here, it still took me a while longer to figure out how to send messages to the singleton and figured I'd leave what I found here

The two key concepts not well explained by the Cluster Singleton Documentation are that:

  1. The actor created with ClusterSingletonManager.props is the parent of the actual instance, and
  2. you should use that actor's address only to create a ClusterSingletonProxy

Each node in the cluster will create the singleton manager and the one that finally wins out as being the oldest is the parent of the singleton you actually want to talk to. ClusterSingletonProxy ensures that you are talking to the actual proxy and that even if the singleton is temporarily unavailable or migrates to another node, you always are talking to the proper instance.

Given that information, the code should be:

class Indexer(systemCreator: SystemCreator, publisherProps: Props, dataProviderProps: Props, name: String) {

  def start {
    val system = systemCreator.create
    val dataProvider = system.actorOf(dataProviderProps)
    val publisher = system.actorOf(publisherProps)

    val indexerProps = ClusterSingletonManager.props(
      singletonProps = had => Props(classOf[SingleCoreIndexer], dataProvider, publisher, name),
      singletonName = "singlecoreindexer",
      terminationMessage = End,
      role = None
    )

    val singletonManager = system.actorOf(
      Props(classOf[SingleCoreIndexer],dataProvider, publisher, name)
    )

    val indexerPath = (singletonManager.path / name)
    val coreIndexer = system.actorOf(
      ClusterSingletonProxy.props(indexerPath, None),
      s"$name-proxy"
    )

    coreIndexer ! "start_indexing"
  }
}
like image 142
Arne Claassen Avatar answered Nov 22 '22 09:11

Arne Claassen


The issue you are seeing (I think) stems from you sending a message to the ClusterSingletonManager instead of to your actual actor that is sitting underneath it. Try looking up the actor underneath if by name (actorFor) and it should work.

like image 40
cmbaxter Avatar answered Nov 22 '22 09:11

cmbaxter