Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to configure downing in akka cluster when a singleton is present

I read in Akka's documentation that when using cluster singleton one should avoid using automatic downing. I don't understand how should downing be configured in that case. I understand that I may subscribe to cluster membership events and plan my strategy according to those messages. However, I don't understand how practically it will be different from automatic downing.

When a node is somehow partitioned from the cluster, if automatic downing is used, the partitioned node will "think" that the entire cluster went missing and start a cluster of its own (with its own singleton). But, on the other hand, I can't keep unreachable nodes in unreachable state forever because the cluster won't reach convergence (new nodes won't be able to join) and if the partitioned node is the singleton itself a new singleton node won't be assigned and therefor, according to my understanding, the only thing that is left to do is to remove unreachable nodes after some grace time which is exactly what automatic downing does.

What do I miss here?

like image 906
Mizh Avatar asked Jun 01 '15 13:06

Mizh


1 Answers

check out the below code. I have turn off the auto-down-unreachable-after feature as the doc said. Instead, I implement a custom logic which is a bit different from normal. The key of the below code is if network partition happens, only cluster nodes which have majority will take down UnreachableMember after some config-able 5s. On the other hand, the minority of the cluster nodes will tread their the UnreachableMember (which is the majority group as unreachable and do not take them down to form an island. The idea of the number of majority is borrow from MongoDB which I think is not new in computer science area.

class ClusterListener extends Actor with ActorLogging {

  val cluster = Cluster(context.system)
  var unreachableMember: Set[Member] = Set()

  // subscribe to cluster changes, re-subscribe when restart 
  override def preStart(): Unit = {
    //#subscribe
    cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[UnreachableMember], classOf[ReachableMember])
    //#subscribe
  }
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case UnreachableMember(member) =>
      log.info("Member detected as unreachable: {}", member)
      val state = cluster.state
      if (isMajority(state.members.size, state.unreachable.size)) {
        scheduletakeDown(member)
      }
    case ReachableMember(member) =>
      unreachableMember = unreachableMember - member
    case _: MemberEvent => // ignore
    case "die" =>
      unreachableMember.foreach { member =>
        cluster.down(member.address)
      }
  }

  // find out majority number of the group
  private def majority(n: Int): Int = (n+1)/2 + (n+1)%2

  private def isMajority(total: Int, dead: Int): Boolean = {
    require(total > 0)
    require(dead >= 0)
    (total - dead) >= majority(total)
  }

  private def scheduletakeDown(member: Member) = {
    implicit val dispatcher = context.system.dispatcher
    unreachableMember = unreachableMember + member
    // make 5s config able!!!
    context.system.scheduler.scheduleOnce(5 seconds, self, "die")
  }

}
like image 105
mingchuno Avatar answered Nov 19 '22 10:11

mingchuno