I was trying to look for an example or usage of the ClusterListener to optimize and improve the debugging information of a service integrated with MongoDB Java client.
How could this be used by us effectively to improve on our Mongo cluster set using Replication?
TL;DR
The ClusterListener
interface can be used to monitor some aspects of a replicaset but if you want to dig deeper and/or if you want to interrogate the replicaset status outside of the events for which the ClusterListener
provides callbacks then you might prefer to invoke the replSetGetStatus
command and inspect its output.
Detail
The ClusterListener
provides call backs which allow you to watch/respond to changes to your replicaset. For example, the following CLusterListener
...
public class LoggingClusterListener implements ClusterListener {
private static final Logger logger = LoggerFactory.getLogger(LoggingClusterListener.class);
@Override
public void clusterOpening(final ClusterOpeningEvent clusterOpeningEvent) {
logger.info("clusterOpening: {}", clusterOpeningEvent.getClusterId().getValue());
}
@Override
public void clusterClosed(final ClusterClosedEvent clusterClosedEvent) {
logger.info("clusterClosed: {}", clusterClosedEvent.getClusterId().getValue());
}
@Override
public void clusterDescriptionChanged(final ClusterDescriptionChangedEvent event) {
logger.info("clusterDescriptionChanged: {}", event.getClusterId().getValue());
for (ServerDescription sd : event.getNewDescription().getServerDescriptions()) {
logger.info("{} / {} / {} / {}", sd.getType(), sd.getCanonicalAddress(), sd.getState().name());
}
}
}
... when associated with a MongoClient
like this ...
final MongoClientOptions options = MongoClientOptions.builder()
.addClusterListener(new LoggingClusterListener())
.build();
return new MongoClient(serverAddresses, options);
... will emit the following logging:
// cluster starting up ...
2017-08-17 12:49:55,977 [main] clusterOpening: 599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] clusterDescriptionChanged: 599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostB:27017] clusterDescriptionChanged: 599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostC:27017] clusterDescriptionChanged: 599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] clusterDescriptionChanged 599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] REPLICA_SET_OTHER / hostB:27017 / CONNECTED / {}
2017-08-17 12:49:56,077 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] REPLICA_SET_OTHER / hostC:27017 / CONNECTED / {}
2017-08-17 12:49:56,077 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] REPLICA_SET_SECONDARY / hostA:27017 / CONNECTED / {}
// ... the primary fails over to hostA:27017
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] clusterDescriptionChanged: 599582e36d47c231ec963b0b
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] REPLICA_SET_OTHER / hostB:27017 / CONNECTED / {}
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] REPLICA_SET_SECONDARY / hostC:27017 / CONNECTED / {}
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017] REPLICA_SET_PRIMARY / hostA:27017 / CONNECTED / {}
2017-08-17 12:50:07,126 [main] clusterClosed: 599582e36d47c231ec963b0b
Perhaps this is sufficient for you but if not, if for example you want to actively monitor replicaset status - rather than only responding when one of the following happens ...
... then you might prefer to periodically sample the replicaset status and report/log/alert on the results. You can do this by executing the replSetGetStatus
command and interrogating the results. This command returns a BsonDocument (the format of which is described here) which can be interrogated and logged.
Logging the status document is the simplest response but that approach could be enhanced to form the basis of a monitoring solution by raising alerts on the basis of the document's contents e.g.
The following code reads the replicaset status document, interrogates it (including calculating the replication lag) and logs the output.
MongoReplicaSetStatusLogger mongoReplicaSetStatusLogger = new MongoReplicaSetStatusLogger();
// periodically ...
MongoClient mongoClient = getMongoClient();
MongoDatabase admin = mongoClient.getDatabase("admin");
BsonDocument commandResult = admin.runCommand(new BsonDocument("replSetGetStatus", new BsonInt32(1)), BsonDocument.class);
mongoReplicaSetStatusLogger.report(commandResult);
Here's the MongoReplicaSetStatusLogger
implementation:
import org.bson.BsonDocument;
import org.bson.BsonInvalidOperationException;
import org.bson.BsonNumber;
import org.bson.BsonValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;
public class MongoReplicaSetStatusLogger {
private static final Logger logger = LoggerFactory.getLogger(MongoReplicaSetStatusLogger.class);
private static final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss,SSSZ");
private static final String DEFAULT_VALUE = "UNKNOWN";
private static final String MEMBERS = "members";
public void report(BsonDocument replicasetStatusDocument) {
if (hasMembers(replicasetStatusDocument)) {
replicasetStatusDocument.getArray(MEMBERS).stream()
.filter(BsonValue::isDocument)
.map(memberDocument -> (BsonDocument) memberDocument)
.forEach(memberDocument -> logMemberDocument(memberDocument));
} else {
logger.warn("The replicaset status document does not contain a '{}' attributes, perhaps there has been " +
"a MongoDB upgrade and the format has changed!", MEMBERS);
}
}
private boolean hasMembers(BsonDocument replicasetStatusDocument) {
return replicasetStatusDocument.containsKey(MEMBERS) && replicasetStatusDocument.get(MEMBERS).isArray();
}
private void logMemberDocument(BsonDocument memberDocument) {
StringBuilder stringBuilder = new StringBuilder()
.append(logAttribute("node", getStringValue(memberDocument, "name")))
.append(logAttribute("health", getNumericValue(memberDocument, "health")))
.append(logAttribute("state", getStringValue(memberDocument, "stateStr")))
.append(logAttribute("uptime(s)", getNumericValue(memberDocument, "uptime")))
.append(logAttribute("lastOptime", getDateTimeValue(memberDocument, "optimeDate")))
.append(logAttribute("lastHeartbeat", getDateTimeValue(memberDocument, "lastHeartbeat")))
.append(logAttribute("lastHeartbeatRecv", getDateTimeValue(memberDocument, "lastHeartbeatRecv")))
.append(logAttribute("ping(ms)", getNumericValue(memberDocument, "pingMs")))
.append(logAttribute("replicationLag(s)", getReplicationLag(memberDocument)));
logger.error(stringBuilder.toString());
}
private String logAttribute(String key, Optional<String> value) {
return new StringBuilder(key).append("=").append(value.orElse(DEFAULT_VALUE)).append("|").toString();
}
private Optional<String> getStringValue(BsonDocument memberDocument, String key) {
if (memberDocument.containsKey(key)) {
try {
return Optional.of(memberDocument.getString(key).getValue().toUpperCase());
} catch (BsonInvalidOperationException e) {
logger.warn("Exception reading: {} from replicaset status document, message: {}.", key, e.getMessage());
}
}
return Optional.empty();
}
private Optional<String> getNumericValue(BsonDocument memberDocument, String key) {
if (memberDocument.containsKey(key)) {
BsonNumber bsonNumber = memberDocument.getNumber(key);
if (bsonNumber.isInt32()) {
return Optional.of(Integer.toString(bsonNumber.intValue()));
} else if (bsonNumber.isInt64()) {
return Optional.of(Long.toString(bsonNumber.longValue()));
} else if (bsonNumber.isDouble()) {
return Optional.of(Double.toString(bsonNumber.doubleValue()));
}
}
return Optional.empty();
}
private Optional<String> getDateTimeValue(BsonDocument memberDocument, String key) {
if (memberDocument.containsKey(key)) {
try {
return Optional.of(dateFormatter.format(new Date(memberDocument.getDateTime(key).getValue())));
} catch (BsonInvalidOperationException e) {
logger.warn("Exception reading: {} from replicaset status document due to: {}!", key, e.getMessage());
}
}
return Optional.empty();
}
private Optional<String> getReplicationLag(BsonDocument memberDocument) {
if (memberDocument.containsKey("optimeDate") && memberDocument.containsKey("lastHeartbeat")) {
try {
long optimeDate = memberDocument.getDateTime("optimeDate").getValue();
long lastHeartbeat = memberDocument.getDateTime("lastHeartbeat").getValue();
long replicationLag = lastHeartbeat - optimeDate;
return Optional.of(Long.toString(replicationLag));
} catch (BsonInvalidOperationException e) {
logger.warn("Exception reading 'optimeDate' or 'lastHeartbeat' from replicaset status document due to: {}!", e.getMessage());
} catch (IllegalArgumentException e) {
logger.warn("Exception calculating the replication lag due to: {}!", e.getMessage());
}
}
return Optional.empty();
}
}
Here's an example of the output:
2017-08-17 15:44:35,192|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostA:27017|health=1.0|state=PRIMARY|uptime(s)=21|lastOptime=2017-08-17T15:43:32,000+0100|lastHeartbeat=UNKNOWN|lastHeartbeatRecv=UNKNOWN|ping(ms)=UNKNOWN|replicationLag(s)=UNKNOWN|
2017-08-17 15:44:35,193|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostB:27017|health=1.0|state=SECONDARY|uptime(s)=17|lastOptime=2017-08-17T15:43:20,000+0100|lastHeartbeat=2017-08-17T15:43:35,443+0100|lastHeartbeatRecv=2017-08-17T15:43:36,412+0100|ping(ms)=0|replicationLag(s)=15443|
2017-08-17 15:44:35,193|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostC:27017|health=1.0|state=SECONDARY|uptime(s)=17|lastOptime=2017-08-17T15:43:20,000+0100|lastHeartbeat=2017-08-17T15:43:35,444+0100|lastHeartbeatRecv=2017-08-17T15:43:36,470+0100|ping(ms)=0|replicationLag(s)=15444|
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With