TLDR: What is required to configure a Spring Boot application that exposes an RSocket interface that supports the WebSocket transport?
I'm learning about RSocket and Spring Boot at the same time, so please bear with me.
In my struggles, I have been able to build a very simple and contrived implementation of a Spring Boot application that consumes an API provided/exposed by a second Spring Boot application using RSocket as the protocol, however, I am only able to achieve this when using the TcpClientTransport
.
From my perspective, the WebsocketTransport
is much more likely to be used and more useful for client->server architectures, however, I haven't found any working examples or documentation on how to properly configure a Spring Boot application that accepts RSocket messages using WebSocket as the transport.
The odd part is that in my tests it appears that my consumer (client) does establish a WebSocket connection to the server/producer, however, the 'handshake' appears to hang and the connection is never fully established. I've tested with both the JavaScript libraries (rsocket-websocket-client, rsocket-rpc-core, etc), and the Java libraries (io.rsocket.transport.netty.client.WebsocketClientTransport) and the server appears to exhibit the same behavior regardless.
To reiterate, using the TCPTransport I am able to connect to the server and invoke requests just fine, however when using the WebsocketTransport
the connection is never established.
What is required of a Spring Boot application that aims to support RSocket via the WebsocketClientTransport
, past consuming spring-boot-starter-rsocket
as a dependency?.
...
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.M5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
...
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
...
spring.rsocket.server.port=8081
management.endpoints.enabled-by-default=true
management.endpoints.web.exposure.include=*
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootRSocketServerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootRSocketServerApplication.class, args);
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
@Slf4j
@Controller
public class UserRSocketController {
@Autowired
private UserRepository userRepository;
@MessageMapping("usersList")
public Mono<List<User>> usersList() {
log.info("Handling usersList request.");
return Mono.just(this.userRepository.getUsers());
}
@MessageMapping("usersStream")
Flux<User> usersStream(UserStreamRequest request) {
log.info("Handling request for usersStream.");
List<User> users = userRepository.getUsers();
Stream<User> userStream = Stream.generate(() -> {
Random rand = new Random();
return users.get(rand.nextInt(users.size()));
});
return Flux.fromStream(userStream).delayElements(Duration.ofSeconds(1));
}
@MessageMapping("userById")
public Mono<User> userById(GetUserByIdRequest request) {
log.info("Handling request for userById id: {}.", request.getId());
return Mono.just(this.userRepository.getUserById(request.getId()));
};
}
:: Spring Boot :: (v2.2.0.M5)
2019-09-08 21:40:02,986 INFO [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRSocketServerApplication on REDACTED with PID 22540 (REDACTED)
2019-09-08 21:40:02,988 INFO [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
2019-09-08 21:40:04,103 INFO [main] org.springframework.boot.actuate.endpoint.web.EndpointLinksResolver: Exposing 14 endpoint(s) beneath base path '/actuator'
2019-09-08 21:40:04,475 INFO [main] org.springframework.boot.rsocket.netty.NettyRSocketServer: Netty RSocket started on port(s): 8081
2019-09-08 21:40:04,494 INFO [main] org.springframework.boot.web.embedded.netty.NettyWebServer: Netty started on port(s): 8080
2019-09-08 21:40:04,498 INFO [main] org.springframework.boot.StartupInfoLogger: Started SpringBootRSocketServerApplication in 1.807 seconds (JVM running for 2.883)
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
//import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
// ClientTransport transport = TcpClientTransport.create(8081);
// ^--- TCPTransport works fine
ClientTransport transport = WebsocketClientTransport.create(8081);
// ^--- Connection hangs and application startup stalls
return RSocketFactory
.connect()
.mimeType(MetadataExtractor.ROUTING.toString(), MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(transport)
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
:: Spring Boot :: (v2.2.0.M5)
2019-09-08 21:40:52,331 INFO [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRsocketConsumerApplication on REDACTED with PID 18904 (REDACTED)
2019-09-08 21:40:52,334 INFO [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
To build the WebSocket server-side, we will utilize the Spring Boot framework which significantly speeds up the development of standalone and web applications in Java. Spring Boot includes the spring-WebSocket module, which is compatible with the Java WebSocket API standard (JSR-356).
RELEASE is used because, at the time of writing, this version has the most production-ready RSocket features.
Overview. RSocket is an application protocol for multiplexed, duplex communication over TCP, WebSocket, and other byte stream transports, using one of the following interaction models: Request-Response — send one message and receive one back. Request-Stream — send one message and receive a stream of messages back.
The Websocket transport is available by installing rsocket using one of the extra features: In order to use a Websocket transport, instantiate a TransportAioHttpWebsocket or TransportQuartWebsocket and pass it to an RSocketServer instance. There are a few helpers to ease the creation of a server or a client.
Leaving your existing terminal window open, in a second terminal window, make the rsocket-server folder your current directory. Then build and run the RSocket server using the following command:./mvnw clean package spring-boot:run -DskipTests=true Alternatively, you can use the “Build” and “Run” commands in your Java IDE if you prefer.
rsocket-py supports multiple transport protocols. The Websocket transport is available by installing rsocket using one of the extra features: In order to use a Websocket transport, instantiate a TransportAioHttpWebsocket or TransportQuartWebsocket and pass it to an RSocketServer instance.
If you are new to RSocket, take a loot these articles first. Our application has 2 endpoints. Whenever the server receives a number “N”, it emits number from 1 to N with 1 second delay. So for every request, server might send multiple responses back via a single WebSocket connection.
You only need two things to have an RSocket application exposing endpoints using the websocket transport:
First, you need both webflux and rsocket dependencies as you'll probably need to serve web pages and static resources as well:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
Then you need to configure the RSocket server accordingly in your application.properties
file:
#server.port=8080 this is already the default
spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rsocket
You'll find more about that in the Spring Boot reference documentation about RSocket.
The websocket client can now connect to ws://localhost:8080/rsocket
.
Note that as of the current 2.2.0 SNAPSHOTs, the RSocket protocol has evolved and the rsocket-js library is currently catching up, especially in the metadata support. You'll find a working sample here as well.
On the Java client side of things, Spring Boot provides you with a RSocketRequester.Builder
that's already configured and customized to your needs with codecs and interceptors:
@Component
public class MyService {
private final RSocketRequester rsocketRequester;
public MyService(RSocketRequester.Builder builder) {
this.rsocketRequester = builder
.connectWebSocket(URI.create("ws://localhost:8080/rsocket"))
.block();
}
}
Based on this blog post the correct port to connect to is the port that is configured via server.port=8080
.
Server Config
server.port=8080
spring.rsocket.server.port=8081
spring.rsocket.server.mapping-path=/ws
spring.rsocket.server.transport=websocket
Java Consumer Client Configuration
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
import java.net.URI;
import java.time.Duration;
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
URI websocketUri = URI.create("ws://127.0.0.1:8080/ws");
WebsocketClientTransport ws = WebsocketClientTransport.create(websocketUri);
return RSocketFactory
.connect()
.mimeType(
MetadataExtractor.ROUTING.toString(),
MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(ws)
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(
rSocket(),
MimeTypeUtils.APPLICATION_JSON,
MetadataExtractor.ROUTING,
rSocketStrategies);
}
}
JavaScript Client Configuration
import { RSocketClient, JsonSerializers } from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';
const transport = new RSocketWebSocketClient({
url: 'ws://127.0.0.1:8080/ws'
});
const client = new RSocketClient({
// send/receive JSON objects instead of strings/buffers
serializers: JsonSerializers,
setup: {
// ms btw sending keepalive to server
keepAlive: 60000,
// ms timeout if no keepalive response
lifetime: 180000,
// format of `data`
dataMimeType: 'application/json',
// format of `metadata`
metadataMimeType: 'application/json',
},
transport,
});
client.connect().then((rsocket) => {
// work with rsocket
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With