Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I use convertAndSendToUser() with an external broker such as RabbitMQ in Spring4?

I am trying to configure web socket support in Spring 4 using RabbitMQ as the external broker, but as soon as I switch to RabbitMQ, I get the following error on start-up in the client:

'/user/queue/changes' is not a valid destination.
Valid destination types are: /temp-queue, /exchange, /topic, /queue, /amq/queue, /reply-queue/.

On the server I am using convertAndSendToUser and this works fine with the simple broker, but as soon as I switch to RabbitMQ, I get this error. Note that RabbitMQ works fine for normal topic broadcasts - it's just the /user channel that falls over.

Do I need to do something special to get /user to work with RabbitMQ?

Edit to include Web Socket Config

My WebSocketConfig is pretty standard with a few customisations to integrate it with spring-session:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractSessionWebSocketMessageBrokerConfigurer<ExpiringSession> {


@Override
public void configureStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/changes").withSockJS();
}

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    //config.enableSimpleBroker("/queue", "/topic");
    StompBrokerRelayRegistration r = config.enableStompBrokerRelay("/user", "/topic");

    try {
        String rabbitUrl = System.getenv("CLOUDAMQP_URL");
        if(rabbitUrl != null) { 
            log.info("RABBIT URL detected: " + rabbitUrl);
            URI uri = new URI(rabbitUrl);
            String host = uri.getHost();                
            String login = uri.getUserInfo().split(":",2)[0];
            String passCode = uri.getUserInfo().split(":",2)[1];
            String vhost = uri.getPath().substring(1);
            r.setRelayHost(host);
            r.setSystemLogin(login);    
            r.setSystemPasscode(passCode);
            r.setClientLogin(login);    
            r.setClientPasscode(passCode);
            r.setVirtualHost(vhost);                
        }
    } catch(Exception e) {
        log.error("Error setting up RabbitMQ", e);
    }
    config.setApplicationDestinationPrefixes("/app");
  }
}
like image 999
Paul Drummond Avatar asked Jan 18 '15 23:01

Paul Drummond


3 Answers

I met the similar error, but I didn't find an intuitive solution after searching Internet.

I would like to share my findings after going throught the doc of RabbitMQ STOMP.

According to the doc of RabbitMQ STOMP, only destination starting with /exchange, /queue, /amq/queue, /topic and temp-queue are allowed. The destination /user/* would NOT be allowed. You can choose the destination per the requirements of your message.

/exchange -- SEND to arbitrary routing keys and SUBSCRIBE to arbitrary binding patterns;

/queue -- SEND and SUBSCRIBE to queues managed by the STOMP gateway;

/amq/queue -- SEND and SUBSCRIBE to queues created outside the STOMP gateway;

/topic -- SEND and SUBSCRIBE to transient and durable topics;

/temp-queue/ -- create temporary queues (in reply-to headers only).

For example, I would like to send a message to a topic to notify all subscribers.

For simple topic destinations which deliver a copy of each message to all active subscribers, destinations of the form /topic/< name > can be used. Topic destinations support all the routing patterns of AMQP topic exchanges.

Messages sent to a topic destination that has no active subscribers are simply discarded.

AMQP 0-9-1 Semantics

For SEND frames, the message is sent to the amq.topic exchange with the routing key < name >.

For SUBSCRIBE frames, an autodeleted, non-durable queue is created and bound to the amq.topic exchange with routing key < name >. A subscription is created against the queue.

The spec means that the stomp sub message with destination /topic/<name> will use the default exchange amp.topic of rabbitmq, then a binding might be created with the variable name if it does not exist and a queue also be created to bind the exchange amp.topic by that binding.

If you want to create a durable subscription, the client should send subscribe message with below headers.

durable:true
auto-delete:false 

In my app the websocket server receives message /watch/{liveid}, then reply another message to topic /topic/watchinfo-{liveid}.

@Secured(User.ROLE_USER)
@MessageMapping("/watch/{liveid}")
@SendTo("/topic/watchinfo-{liveid}")
@JsonView(View.Live.class)
public LiveWatchInfoMessage liveinfo(@DestinationVariable("liveid") String liveid,
                                         @AuthenticationPrincipal UserDetails activeUser) {
        ...
        return LiveWatchInfoMessage.builder().build();
    }
like image 163
Kane Avatar answered Sep 23 '22 20:09

Kane


It is quite an old question, but I just met the same error so my solution might works for others. It's basicly what @Memo 313 MediaSA said(and why his answer is downvoted). The two key points I have found are:

  1. set only the prefix that message broker support(for rabbitmq, see:https://www.rabbitmq.com/stomp.html), and only use them in application.
  2. *optional: call setUserDestinationPrefix and set a user destinations for MessageBrokerRegistry when over configureMessageBroker.(the default is "/user/" so if you just want to use "/user/" as user destinations this setting is not necessary)

After doing those, everything should work as what doc for setUserDestinationPrefix methord says:

For example when a user attempts to subscribe to "/user/queue/position-updates", the destination may be translated to "/queue/position-updatesi9oqdfzo" yielding a unique queue name that does not collide with any other user attempting to do the same. Subsequently when messages are sent to "/user/{username}/queue/position-updates", the destination is translated to "/queue/position-updatesi9oqdfzo".

Also, be sure that the {username} your application is using is the same with principal.

like image 28
bitShift Avatar answered Sep 22 '22 20:09

bitShift


Remove the "/user" from config.enableStompBrokerRelay("/user", "/topic"); and add the "/queue", spring automatically adds the /user when you send-to-a-specific user

And just sendTo(username, "/queue/(what_you_want)", msg) and subscibe to "/queue/(what_you_want)".

The bold statement came directly from RabbitMq website I tried to find the link but I got lazy

like image 37
Memo 313 MediaSA Avatar answered Sep 20 '22 20:09

Memo 313 MediaSA