Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to send custom message to custom user with spring websocket?

I'm new to spring websocket. I want to send product changes to clients. For this, I want to do it as follows: Client creates a socket connection and subscribes destination:

var socket = new SockJS('/websocket');
var stompClient = Stomp.over(socket);

stompClient.connect({}, function (frame) {
    stompClient.subscribe('/product/changes', function (scoredata) {
        // We received product changes
    });
});
//Send Ajax request and say server I want to know product with id=5 changes.
sendAjaxRequest(5);

I've configured spring app as follow:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/product/");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

Now I need the following method:

@RestController
public class ProductController {

    @GetMapping("product-{id}")
    public void startSubscribe(@PathVariable("id") Long id) {

        // register current websocket session with product id and 
        // then with convertAndSendToUser send changes to current user.
    }

}

How do I to implement it?

like image 345
Morteza Malvandi Avatar asked Feb 19 '19 09:02

Morteza Malvandi


2 Answers

My question in the first place would be, why are you trying to send a http request to a rest controller when you successfully integrated websockets with stomp? If I am understanding your use case correctly, there should be three solutions I can think of atm.

Solution 1 (socket session ↔ product id)

You can send your request directly from your client to the server via the open websocket connection. Spring can then determine which Websocket session made the call and you can implement your business logic. You need to activate another broker called "/queue" and specify the prefix for the user target that is needed when a subscription is not intended for a broadcast. On the client side, you must also change your subscription path. Finally, you must create a class that is commented with @Controller that contains your message mappings to receive messages from the connected client.

Server Config

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").withSockJS();
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/queue", "/product");  // <- added "/queue"
        registry.setApplicationDestinationPrefixes("/app");
        registry.setUserDestinationPrefix("/user");
    }
}

Server Controller

@Controller
public class WebSocketContoller{
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @MessageMapping("/product/register")
    public void register(@Payload Long productId, @Header("simpSessionId") String sessionId) {
        // register current websocket session with product id and 
        // then with convertAndSendToUser send changes to current user.

        // Example of how to send a message to the user using the sessionId
        String response = "This could also be one of your product objects of type Product";
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
        headerAccessor.setSessionId(sessionId);
        headerAccessor.setLeaveMutable(true);

        messagingTemplate.convertAndSendToUser(sessionId,"/queue/product/changes", response, headerAccessor.getMessageHeaders());
    }
}

Client subscription change

stompClient.subscribe('/user/queue/product/changes', function (scoredata) {
    // We received product changes
});

For detailed information you can also check this answer: https://stackoverflow.com/a/26288475/11133168


Solution 2 (principal ↔ product id)

However, if you really want to consider using a rest controller to start registering your process, or if it just doesn't meet your requirements, you should look at the link below. Spring is also able to track active websocket sessions and their users through an exposed SimpUserRegistry bean. However, you will need to configure a custom ChannelInterceptor adapter for your client input channel, depending on the security of your applications, to determine a user. Check this answer for detailed information and code examples: https://stackoverflow.com/a/45359294/11133168


Solution 3 (product id topics)

You could also subscribe to a specific product id topic so you don't even need to know which user wants to be notified about changes for a specific product.

Client subscription change

//e.g if you want to be notified about changes for products with id 5 
stompClient.subscribe('/product/changes/5', function (scoredata) {
    // We received product changes
});

Server service example

@Service
public class WebSocketProductService{

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    // This would be the method which should inform your clients about specific product     
    // changes, instead of the String parameters a Product object should be used instead, 
    // you have to call this method yourself on product changes or schedule it or sth.
    public void sendProductChange(String product, String productId) {
        this.simpMessagingTemplate.convertAndSend("/product/changes/"+productId, product);
    }
}

Server Controller

Needed if you want to manage a list of product id subscriptions. Like explained in solution 1 you need a class annotated with @Controller which contains a method annotated with @SubscribeMapping. This method gets called if a a client tries to subscribe to the specified path.

@Controller
public class WebSocketContoller{
    @SubscribeMapping("/product/changes/{productId}")
    public void productIdSubscription(@DestinationVariable Long productId) {
        //Manage your product id subscription list e.g.
    }
}
like image 172
FlorianDe Avatar answered Nov 09 '22 21:11

FlorianDe


If you want to send product updates to users only when users requests them, then you can use normal HTTP requests. But I understand you want to push notifications based on a user-specific business logic. You must be also implementing Spring Security to authenticate your users.


Solution

I propose to add this business logic in your backend using a user_product_updates( user_id, product_id) table - each row corresponding to a product_id that a user with user_id want to subscribe to the updates:

@GetMapping("product-{id}")
public void startSubscribe(@PathVariable("id") Long id) {
    // Save this custom setting into your models
}

Now you can run a scheduled backend job (which can be a cron job based on the business logic of your push notifications) to send updates to your users:

@Autowired 
org.springframework.messaging.simp.SimpMessagingTemplate simpMessagingTemplate;   

@Scheduled(cron = "0 0 1 * * ?") // based on your business logic (say daily at 12:01 am)
public void scheduleTaskUsingCronExpression() {
   // loop through user_product_updates table and construct "data"
   // username is from your spring security username (principal.getName())
   simpMessagingTemplate.convertAndSendToUser(username, "/queue/products", data);
}

Going forward, you may want to add some caches to optimize them (especially getting product info from product_id) so that things run smoothly.


Summary

Your web-socket configuration:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app")
            .setUserDestinationPrefix("/user")
            .enableSimpleBroker("/topic", "/queue", "/product");
    }
}

Your listener in the frontend application can look like:

that.stompClient.subscribe("/user/queue/products", (message) => {
    if (message.body) {
      // We received product changes
    }
});

Users will register for product updates:

@GetMapping("product-{id}")
public void startSubscribe(@PathVariable("id") Long id) {
    // Save to your persistence module
    // (that the particular user wants updates from such-and-such products)
}

Backend scheduler job will send updates as and when available:

@Scheduled(cron = "0 0 1 * * ?") // based on your business logic
public void scheduleTaskUsingCronExpression() {
   // loop through user_product_updates table and construct "data"
   // username is from your spring security username (principal.getName())
   template.convertAndSendToUser(username, "/queue/products", data);
}
like image 23
kukkuz Avatar answered Nov 09 '22 20:11

kukkuz