Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring boot - how to connect to external ActiveMQ master/slave cluster with failover URL

We have 2 ActiveMQ nodes on different VMs (e.g. hosts: amq1, amq2). They are linked as master/slave cluster.

We would like to connect to this cluster with failover protocol. How can this be done? Spring boot config:

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {

  @Override
  public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic")
        .setRelayHost(activeMQProperties.getRelayHost())
        .setRelayPort(activeMQProperties.getRelayPort());
  }
}

Unfortunately here we have only the possibility to set one host and one port. How could we set something like this:

failover:(stomp://amq1:61613,stomp://amq2:61613)

UPDATE: currently Spring Boot 2.3.5 is used

like image 351
Karbert Avatar asked Jan 26 '18 12:01

Karbert


1 Answers

I've tried the options mentioned by you with "failover" in the connection string but it did not work, and found some threads that it is not even supported for stomp.

So the final solution looks like an own implementation: two ActiveMQ servers with master-slave configurations.

Spring config (important part):

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
    private final Logger LOGGER = LoggerFactory.getLogger(WebsocketConfiguration.class);
    
    private final ActiveMQProperties activeMQProperties;
    
    // used by own round-robin implementation to connect to the current master ActiveMQ
    private int index = 0;
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic")
            .setSystemHeartbeatReceiveInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
            .setSystemHeartbeatSendInterval(STOMP_SYSTEM_HEARTBEAT_INTERVAL)
            .setTcpClient(createTcpClient());
    }
    
    private ReactorNettyTcpClient<byte[]> createTcpClient() {
        return new ReactorNettyTcpClient<>(
            client -> client.remoteAddress(socketAddressSupplier()),
            new StompReactorNettyCodec());
    }

    private Supplier<? extends SocketAddress> socketAddressSupplier() {

        Supplier<? extends SocketAddress> socketAddressSupplier = () -> {
            index++;
            if (index >= activeMQProperties.getActiveMQServerList().size()) {
                index = 0;
            }
            return new InetSocketAddress(activeMQProperties.getActiveMQServerList().get(index).getRelayHost(),
                activeMQProperties.getActiveMQServerList().get(index).getRelayPort());
        };

        return socketAddressSupplier;
    }
}

The ActiveMQProperties:

activemq:                              
    activeMQServerList:
      -
        relayHost: host1
        relayPort: 61613
      -
        relayHost: host2
        relayPort: 61613

The trick is the supplier. When the master ActiveMQ goes down, the supplier will return the next configured server from the list and reconnects to that.

It works properly.

like image 70
Karbert Avatar answered Dec 03 '22 10:12

Karbert