Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Boot SSL TCPClient ~ StompBrokerRelayMessageHandler ~ ActiveMQ ~ Undertow

I'm attempting to build a websocket messaging app based on the Spring Websocket Demo running ActiveMQ as the STOMP message broker with Undertow. The application runs fine on insecure connections. However, I'm having difficulty configuring the STOMP Broker Relay to forward with SSL connections.

As mentioned in the Spring WebSocket Docs...

The "STOMP broker relay" in the above configuration is a Spring MessageHandler that handles messages by forwarding them to an external message broker. To do so it establishes TCP connections to the broker, forwards all messages to it, and then forwards all messages received from the broker to clients through their WebSocket sessions. Essentially it acts as a "relay" that forwards messages in both directions.

Further, the docs state a dependency on reactor-net which I have...

Please add a dependency on org.projectreactor:reactor-net for TCP connection management.

The issue is that my current implementation doesn't initialize the NettyTCPClient via SSL so the ActiveMQ connection fails with an SSLException.


[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED: 
[id: 0xcfef39e9, /127.0.0.1:17779 => localhost/127.0.0.1:8442]
...
[o.a.a.b.TransportConnection.Transport:245] » 
Transport Connection to: tcp://127.0.0.1:17779 failed:
javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?
...

As such I've attempted to research the Project Reactor Docs to set SSL options for the connection but I haven't been successful.

At this point I've found the StompBrokerRelayMessageHandler initializes the NettyTCPClient by default in Reactor2TcpClient yet, it doesn't appear to configurable.

Assistance would be greatly appreciated.

SSCCE


app.props

spring.activemq.in-memory=true
spring.activemq.pooled=false
spring.activemq.broker-url=stomp+ssl://localhost:8442
server.port=8443
server.ssl.enabled=true
server.ssl.protocol=tls
server.ssl.key-alias=undertow
server.ssl.key-store=classpath:undertow.jks
server.ssl.key-store-password=xxx
server.ssl.trust-store=classpath:undertow_certs.jks
server.ssl.trust-store-password=xxx

WebSocketConfig

//... 
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class);

    private final static String KEYSTORE = "/activemq.jks";
    private final static String KEYSTORE_PASS = "xxx";
    private final static String KEYSTORE_TYPE = "JKS";
    private final static String TRUSTSTORE = "/activemq_certs.jks";
    private final static String TRUSTSTORE_PASS = "xxx";

    private static String getBindLocation() {
        return "stomp+ssl://localhost:8442?transport.needClientAuth=false";
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public SslBrokerService activeMQBroker() throws Exception {

        final SslBrokerService service = new SslBrokerService();
        service.setPersistent(false);

        KeyManager[] km = SecurityManager.getKeyManager();
        TrustManager[] tm = SecurityManager.getTrustManager();

        service.addSslConnector(getBindLocation(), km, tm, null);
        final ActiveMQTopic topic = new ActiveMQTopic("jms.topic.test");
        service.setDestinations(new ActiveMQDestination[]{topic});

        return service;
    }


    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(8442);
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/welcome").withSockJS();
        registry.addEndpoint("/test").withSockJS();
    }

   private static class SecurityManager { 
   //elided...
   }

}

SOLVED Per Rossens Advice. Here's the implementation details for anyone interested.


WebSocketConfig

@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {
    ...
    @Bean
    public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
      StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
      ConfigurationReader reader = new StompClientDispatcherConfigReader();
      Environment environment = new Environment(reader).assignErrorJournal();
      TcpOperations<byte[]> client = new Reactor2TcpClient<>(new StompTcpClientSpecFactory(environment,"localhost", 8443));
      handler.setTcpClient(client);
      return handler;
    }
}

StompTCPClientSpecFactory

private static class StompTcpClientSpecFactory
        implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

    private static final Logger log = LoggerFactory.getLogger(StompTcpClientSpecFactory.class);

    private final String host;
    private final int port;
    private final String KEYSTORE = "src/main/resources/tcpclient.jks";
    private final String KEYSTORE_PASS = "xxx";
    private final String KEYSTORE_TYPE = "JKS";
    private final String TRUSTSTORE = "/src/main/resources/tcpclient_certs.jks";
    private final String TRUSTSTORE_PASS = "xxx";
    private final String TRUSTSTORE_TYPE = "JKS";
    private final Environment environment;

    private final SecurityManager tcpManager = new SecurityManager
            .SSLBuilder(KEYSTORE, KEYSTORE_PASS)
            .keyStoreType(KEYSTORE_TYPE)
            .trustStore(TRUSTSTORE, TRUSTSTORE_PASS)
            .trustStoreType(TRUSTSTORE_TYPE)
            .build();

    public StompTcpClientSpecFactory(Environment environment, String host, int port) {
        this.environment = environment;
        this.host = host;
        this.port = port;
    }

    @Override
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
            Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {

        return tcpClientSpec
                .ssl(new SslOptions()
                        .sslProtocol("TLS")
                        .keystoreFile(tcpManager.getKeyStore())
                        .keystorePasswd(tcpManager.getKeyStorePass())
                        .trustManagers(tcpManager::getTrustManager)
                        .trustManagerPasswd(tcpManager.getTrustStorePass()))
                .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
                .env(this.environment)
                .dispatcher(this.environment.getCachedDispatchers("StompClient").get())
                .connect(this.host, this.port);
    }
}
like image 284
Edward J Beckett Avatar asked Dec 17 '15 12:12

Edward J Beckett


4 Answers

The StompBrokerRelayMessageHandler has a tcpClient property you can set. However it looks like we don't expose that through the WebSocketMessageBrokerConfigurer setup.

You can remove @EnableWebSocketMessageBroker and extend DelegatingWebSocketMessageBrokerConfiguration instead. It's effectively the same but you're now extending directly from the configuration class that provides all the beans.

This allows you to then override the stompBrokerRelayMessageHandler() bean and set its TcpClient property directly. Just make sure the overriding method is marked with @Bean.

like image 176
Rossen Stoyanchev Avatar answered Nov 01 '22 17:11

Rossen Stoyanchev


I needed to secure a STOMP broker relay to RabbitMQ using Spring Messaging 4.2.5 with Java 8 and found that the question's follow-up code had become outdated.

When launching my application, I provide truststore environment properties to trust an internal self-signed certificate authority. java -Djavax.net.ssl.trustStore=/etc/pki/java/server.jks -Djavax.net.ssl.trustStorePassword=xxxxx -jar build/libs/server.war

Per Rossen's answer, I changed

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

to

@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {

Then, in that WebSocketConfig, I provided my own AbstractBrokerMessageHandler bean:

@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
    AbstractBrokerMessageHandler handler = super.stompBrokerRelayMessageHandler();
    if (handler instanceof StompBrokerRelayMessageHandler) {
        ((StompBrokerRelayMessageHandler) handler).setTcpClient(new Reactor2TcpClient<>(
                new StompTcpFactory("127.0.0.1", 61614, true)
        ));
    }
    return handler;
}

The instanceof conditional was to simplify use of a NoOpBrokerMessageHandler in unit tests.

And finally, the following is the implementation of the StompTcpFactory used above:

public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

    private final Environment environment = new Environment(new SynchronousDispatcherConfigReader());
    private final String host;
    private final int port;
    private final boolean ssl;

    public StompTcpFactory(String host, int port, boolean ssl) {
        this.host = host;
        this.port = port;
        this.ssl = ssl;
    }

    @Override
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
        return tcpClientSpec
                .env(environment)
                .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
                .ssl(ssl ? new SslOptions() : null)
                .connect(host, port);
    }

    private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
        @Override
        public ReactorConfiguration read() {
            return new ReactorConfiguration(Collections.emptyList(), "sync", new Properties());
        }
    }

}
like image 20
amoebob Avatar answered Nov 01 '22 17:11

amoebob


@amoebob answer is great but threads aren't close properly. Each time a connexion from client is open, a new thread is open and never closed. I discover this issue in production and spend few days to resolve it. So I suggest you to change StompTcpFactory to improve threads reuse :

import io.netty.channel.EventLoopGroup;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.stomp.Reactor2StompCodec;
import org.springframework.messaging.simp.stomp.StompDecoder;
import org.springframework.messaging.simp.stomp.StompEncoder;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import reactor.Environment;
import reactor.core.config.ReactorConfiguration;
import reactor.io.net.NetStreams;
import reactor.io.net.Spec;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.netty.NettyClientSocketOptions;

public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

  private final Environment environment;
  private final EventLoopGroup eventLoopGroup;
  private final String host;
  private final int port;
  private final boolean ssl;

  public StompTcpFactory(String host, int port, boolean ssl) {
    this.host = host;
    this.port = port;
    this.ssl = ssl;
    this.environment = new Environment(() -> new ReactorConfiguration(emptyList(), "sync", new Properties()));
    this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
  }

  @Override
  public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
    return tcpClientSpec
            .env(environment)
            .options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup))
            .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
            .ssl(ssl ? new SslOptions() : null)
            .connect(host, port);
  }

}
like image 4
Martin Choraine Avatar answered Nov 01 '22 16:11

Martin Choraine


For everyone looking for updated solution I managed to solve the issue in cleaner way. Simply create and use own TCP client with SSL:

import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {

  private final WebsocketProperties properties;

  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/ws").setAllowedOrigins("*");
    registry.addEndpoint("/ws").withSockJS();
  }

  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) {

    ReactorNettyTcpClient<byte[]> tcpClient = new ReactorNettyTcpClient<>(configurer -> configurer
            .host(properties.getRelayHost())
            .port(properties.getRelayPort())
            .secure(), new StompReactorNettyCodec());

    registry.enableStompBrokerRelay("/queue", "/topic")
            .setAutoStartup(true)
            .setSystemLogin(properties.getClientLogin())
            .setSystemPasscode(properties.getClientPasscode())
            .setClientLogin(properties.getClientLogin())
            .setClientPasscode(properties.getClientPasscode())
            .setTcpClient(tcpClient);

    registry.setApplicationDestinationPrefixes("/app");
    }
}

In my case (slightly different) I created two implementations of ReactorNettyTcpClient as a Beans and depending on the environment I pick one with / without SSL.

Dependencies:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.4</version>
        <relativePath/>
    </parent>
    .
    .
    .
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-stomp</artifactId>
        <version>5.16.2</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor.netty</groupId>
        <artifactId>reactor-netty</artifactId>
        <version>1.0.8</version>
    </dependency>

I hope anybody who's currently trying to resolve this issue find it useful.

like image 1
Emtec165 Avatar answered Nov 01 '22 16:11

Emtec165