I read in Akka's documentation that when using cluster singleton one should avoid using automatic downing. I don't understand how should downing be configured in that case. I understand that I may subscribe to cluster membership events and plan my strategy according to those messages. However, I don't understand how practically it will be different from automatic downing.
When a node is somehow partitioned from the cluster, if automatic downing is used, the partitioned node will "think" that the entire cluster went missing and start a cluster of its own (with its own singleton). But, on the other hand, I can't keep unreachable nodes in unreachable state forever because the cluster won't reach convergence (new nodes won't be able to join) and if the partitioned node is the singleton itself a new singleton node won't be assigned and therefor, according to my understanding, the only thing that is left to do is to remove unreachable nodes after some grace time which is exactly what automatic downing does.
What do I miss here?
check out the below code. I have turn off the auto-down-unreachable-after
feature as the doc said. Instead, I implement a custom logic which is a bit different from normal. The key of the below code is if network partition happens, only cluster nodes which have majority will take down UnreachableMember
after some config-able 5s. On the other hand, the minority of the cluster nodes will tread their the UnreachableMember
(which is the majority group as unreachable
and do not take them down to form an island. The idea of the number of majority is borrow from MongoDB which I think is not new in computer science area.
class ClusterListener extends Actor with ActorLogging {
val cluster = Cluster(context.system)
var unreachableMember: Set[Member] = Set()
// subscribe to cluster changes, re-subscribe when restart
override def preStart(): Unit = {
//#subscribe
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[UnreachableMember], classOf[ReachableMember])
//#subscribe
}
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case UnreachableMember(member) =>
log.info("Member detected as unreachable: {}", member)
val state = cluster.state
if (isMajority(state.members.size, state.unreachable.size)) {
scheduletakeDown(member)
}
case ReachableMember(member) =>
unreachableMember = unreachableMember - member
case _: MemberEvent => // ignore
case "die" =>
unreachableMember.foreach { member =>
cluster.down(member.address)
}
}
// find out majority number of the group
private def majority(n: Int): Int = (n+1)/2 + (n+1)%2
private def isMajority(total: Int, dead: Int): Boolean = {
require(total > 0)
require(dead >= 0)
(total - dead) >= majority(total)
}
private def scheduletakeDown(member: Member) = {
implicit val dispatcher = context.system.dispatcher
unreachableMember = unreachableMember + member
// make 5s config able!!!
context.system.scheduler.scheduleOnce(5 seconds, self, "die")
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With