We have 2 ActiveMQ nodes on different VMs (e.g. hosts: amq1, amq2). They are linked as master/slave cluster.
We would like to connect to this cluster with failover protocol. How can this be done? Spring boot config:
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic")
.setRelayHost(activeMQProperties.getRelayHost())
.setRelayPort(activeMQProperties.getRelayPort());
}
}
Unfortunately here we have only the possibility to set one host and one port. How could we set something like this:
failover:(stomp://amq1:61613,stomp://amq2:61613)
UPDATE: currently Spring Boot 2.3.5 is used
I've tried the options mentioned by you with "failover" in the connection string but it did not work, and found some threads that it is not even supported for stomp.
So the final solution looks like an own implementation: two ActiveMQ servers with master-slave configurations.
Spring config (important part):
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
private final Logger LOGGER = LoggerFactory.getLogger(WebsocketConfiguration.class);
private final ActiveMQProperties activeMQProperties;
// used by own round-robin implementation to connect to the current master ActiveMQ
private int index = 0;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic")
.setSystemHeartbeatReceiveInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
.setSystemHeartbeatSendInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
.setTcpClient(createTcpClient());
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
return new ReactorNettyTcpClient<>(
client -> client.remoteAddress(socketAddressSupplier()),
new StompReactorNettyCodec());
}
private Supplier<? extends SocketAddress> socketAddressSupplier() {
Supplier<? extends SocketAddress> socketAddressSupplier = () -> {
index++;
if (index >= activeMQProperties.getActiveMQServerList().size()) {
index = 0;
}
return new InetSocketAddress(activeMQProperties.getActiveMQServerList().get(index).getRelayHost(),
activeMQProperties.getActiveMQServerList().get(index).getRelayPort());
};
return socketAddressSupplier;
}
}
The ActiveMQProperties:
activemq:
activeMQServerList:
-
relayHost: host1
relayPort: 61613
-
relayHost: host2
relayPort: 61613
The trick is the supplier. When the master ActiveMQ goes down, the supplier will return the next configured server from the list and reconnects to that.
It works properly.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With