Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to support WebSocket transport with Spring Boot RSocket server?

TLDR: What is required to configure a Spring Boot application that exposes an RSocket interface that supports the WebSocket transport?


I'm learning about RSocket and Spring Boot at the same time, so please bear with me.

In my struggles, I have been able to build a very simple and contrived implementation of a Spring Boot application that consumes an API provided/exposed by a second Spring Boot application using RSocket as the protocol, however, I am only able to achieve this when using the TcpClientTransport.

From my perspective, the WebsocketTransport is much more likely to be used and more useful for client->server architectures, however, I haven't found any working examples or documentation on how to properly configure a Spring Boot application that accepts RSocket messages using WebSocket as the transport.

The odd part is that in my tests it appears that my consumer (client) does establish a WebSocket connection to the server/producer, however, the 'handshake' appears to hang and the connection is never fully established. I've tested with both the JavaScript libraries (rsocket-websocket-client, rsocket-rpc-core, etc), and the Java libraries (io.rsocket.transport.netty.client.WebsocketClientTransport) and the server appears to exhibit the same behavior regardless.

To reiterate, using the TCPTransport I am able to connect to the server and invoke requests just fine, however when using the WebsocketTransport the connection is never established.

What is required of a Spring Boot application that aims to support RSocket via the WebsocketClientTransport, past consuming spring-boot-starter-rsocket as a dependency?.

Server


pom.xml

...

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.0.M5</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

...

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-rsocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

...

application.properties

spring.rsocket.server.port=8081
management.endpoints.enabled-by-default=true
management.endpoints.web.exposure.include=*

SpringBootRSocketServerApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootRSocketServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootRSocketServerApplication.class, args);
    }
}

UserRSocketController

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;

@Slf4j
@Controller
public class UserRSocketController {

    @Autowired
    private UserRepository userRepository;

    @MessageMapping("usersList")
    public Mono<List<User>> usersList() {
        log.info("Handling usersList request.");
        return Mono.just(this.userRepository.getUsers());
    }

    @MessageMapping("usersStream")
    Flux<User> usersStream(UserStreamRequest request) {
        log.info("Handling request for usersStream.");
        List<User> users = userRepository.getUsers();
        Stream<User> userStream = Stream.generate(() -> {
            Random rand = new Random();
            return users.get(rand.nextInt(users.size()));
        });
        return Flux.fromStream(userStream).delayElements(Duration.ofSeconds(1));
    }

    @MessageMapping("userById")
    public Mono<User> userById(GetUserByIdRequest request) {
        log.info("Handling request for userById id: {}.", request.getId());
        return Mono.just(this.userRepository.getUserById(request.getId()));
    };
}

Startup Logging

 :: Spring Boot ::             (v2.2.0.M5)

2019-09-08 21:40:02,986 INFO  [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRSocketServerApplication on REDACTED with PID 22540 (REDACTED)
2019-09-08 21:40:02,988 INFO  [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
2019-09-08 21:40:04,103 INFO  [main] org.springframework.boot.actuate.endpoint.web.EndpointLinksResolver: Exposing 14 endpoint(s) beneath base path '/actuator'
2019-09-08 21:40:04,475 INFO  [main] org.springframework.boot.rsocket.netty.NettyRSocketServer: Netty RSocket started on port(s): 8081
2019-09-08 21:40:04,494 INFO  [main] org.springframework.boot.web.embedded.netty.NettyWebServer: Netty started on port(s): 8080
2019-09-08 21:40:04,498 INFO  [main] org.springframework.boot.StartupInfoLogger: Started SpringBootRSocketServerApplication in 1.807 seconds (JVM running for 2.883)

Consumer/Client


ClientConfiguration.java

import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
//import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocket rSocket() {
        // ClientTransport transport = TcpClientTransport.create(8081);
        // ^--- TCPTransport works fine

        ClientTransport transport = WebsocketClientTransport.create(8081);
        // ^--- Connection hangs and application startup stalls

        return RSocketFactory
            .connect()
            .mimeType(MetadataExtractor.ROUTING.toString(), MimeTypeUtils.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(transport)
            .start()
            .block();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
    }
}

Startup Logging

 :: Spring Boot ::             (v2.2.0.M5)

2019-09-08 21:40:52,331 INFO  [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRsocketConsumerApplication on REDACTED with PID 18904 (REDACTED)
2019-09-08 21:40:52,334 INFO  [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
like image 614
Vigs Avatar asked Sep 09 '19 02:09

Vigs


People also ask

Does spring boot support WebSocket?

To build the WebSocket server-side, we will utilize the Spring Boot framework which significantly speeds up the development of standalone and web applications in Java. Spring Boot includes the spring-WebSocket module, which is compatible with the Java WebSocket API standard (JSR-356).

Is RSocket production ready?

RELEASE is used because, at the time of writing, this version has the most production-ready RSocket features.

What is spring boot RSocket?

Overview. RSocket is an application protocol for multiplexed, duplex communication over TCP, WebSocket, and other byte stream transports, using one of the following interaction models: Request-Response — send one message and receive one back. Request-Stream — send one message and receive a stream of messages back.

How do I use the WebSocket transport?

The Websocket transport is available by installing rsocket using one of the extra features: In order to use a Websocket transport, instantiate a TransportAioHttpWebsocket or TransportQuartWebsocket and pass it to an RSocketServer instance. There are a few helpers to ease the creation of a server or a client.

How do I run a RSocket server in Spring Boot?

Leaving your existing terminal window open, in a second terminal window, make the rsocket-server folder your current directory. Then build and run the RSocket server using the following command:./mvnw clean package spring-boot:run -DskipTests=true Alternatively, you can use the “Build” and “Run” commands in your Java IDE if you prefer.

How do I use RSocket-Py with WebSocket?

rsocket-py supports multiple transport protocols. The Websocket transport is available by installing rsocket using one of the extra features: In order to use a Websocket transport, instantiate a TransportAioHttpWebsocket or TransportQuartWebsocket and pass it to an RSocketServer instance.

How does RSocket work?

If you are new to RSocket, take a loot these articles first. Our application has 2 endpoints. Whenever the server receives a number “N”, it emits number from 1 to N with 1 second delay. So for every request, server might send multiple responses back via a single WebSocket connection.


2 Answers

You only need two things to have an RSocket application exposing endpoints using the websocket transport:

First, you need both webflux and rsocket dependencies as you'll probably need to serve web pages and static resources as well:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-rsocket</artifactId>
    </dependency>

Then you need to configure the RSocket server accordingly in your application.properties file:

#server.port=8080 this is already the default
spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rsocket

You'll find more about that in the Spring Boot reference documentation about RSocket.

The websocket client can now connect to ws://localhost:8080/rsocket.

Note that as of the current 2.2.0 SNAPSHOTs, the RSocket protocol has evolved and the rsocket-js library is currently catching up, especially in the metadata support. You'll find a working sample here as well.

On the Java client side of things, Spring Boot provides you with a RSocketRequester.Builder that's already configured and customized to your needs with codecs and interceptors:

@Component
public class MyService {

    private final RSocketRequester rsocketRequester;

    public MyService(RSocketRequester.Builder builder) {
        this.rsocketRequester = builder
                .connectWebSocket(URI.create("ws://localhost:8080/rsocket"))
                .block();
    }
}
like image 60
Brian Clozel Avatar answered Oct 19 '22 00:10

Brian Clozel


Based on this blog post the correct port to connect to is the port that is configured via server.port=8080.

enter image description here

Server Config

server.port=8080
spring.rsocket.server.port=8081
spring.rsocket.server.mapping-path=/ws
spring.rsocket.server.transport=websocket

Java Consumer Client Configuration

import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;

import java.net.URI;
import java.time.Duration;

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocket rSocket() {
        URI websocketUri = URI.create("ws://127.0.0.1:8080/ws");
        WebsocketClientTransport ws = WebsocketClientTransport.create(websocketUri);
        return RSocketFactory
            .connect()
            .mimeType(
                MetadataExtractor.ROUTING.toString(),
                MimeTypeUtils.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(ws)
            .start()
            .block();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(
            rSocket(),
            MimeTypeUtils.APPLICATION_JSON,
            MetadataExtractor.ROUTING,
            rSocketStrategies);
    }
}

JavaScript Client Configuration

import { RSocketClient, JsonSerializers } from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';

const transport = new RSocketWebSocketClient({
    url: 'ws://127.0.0.1:8080/ws'
});

const client = new RSocketClient({
    // send/receive JSON objects instead of strings/buffers
    serializers: JsonSerializers,
    setup: {
        // ms btw sending keepalive to server
        keepAlive: 60000,

        // ms timeout if no keepalive response
        lifetime: 180000,

        // format of `data`
        dataMimeType: 'application/json',

        // format of `metadata`
        metadataMimeType: 'application/json',
    },
    transport,
});

client.connect().then((rsocket) => {
    // work with rsocket
});
like image 26
Vigs Avatar answered Oct 18 '22 23:10

Vigs