I have a question about Springs RSocketRequester. I have a rsocket server and client. Client connects to this server and requests @MessageMapping endpoint. It works as expected.
But what if I restart the server. How to do automatic reconnect to rsocket server from client? Thanks
Server:
@Controller
class RSC {
@MessageMapping("pong")
public Mono<String> pong(String m) {
return Mono.just("PONG " + m);
}
}
Client:
@Bean
public RSocketRequester rSocketRequester() {
return RSocketRequester
.builder()
.connectTcp("localhost", 7000)
.block();
}
@RestController
class RST {
@Autowired
private RSocketRequester requester;
@GetMapping(path = "/ping")
public Mono<String> ping(){
return this.requester
.route("pong")
.data("TEST")
.retrieveMono(String.class)
.doOnNext(System.out::println);
}
}
I don't think I would create a RSocketRequester
bean in an application. Unlike WebClient
(which has a pool of reusable connections), the RSocket requester wraps a single RSocket, i.e. a single network connection.
I think it's best to store a Mono<RSocketRequester>
and subscribe to that to get an actual requester when needed. Because you don't want to create a new connection for each call, you can cache the result. Thanks to Mono
retryXYZ
operators, there are many ways you can refine the reconnection behavior.
You could try something like the following:
@Service
public class RSocketPingService {
private final Mono<RSocketRequester> requesterMono;
// Spring Boot is creating an auto-configured RSocketRequester.Builder bean
public RSocketPingService(RSocketRequester.Builder builder) {
this.requesterMono = builder
.dataMimeType(MediaType.APPLICATION_CBOR)
.connectTcp("localhost", 7000).retry(5).cache();
}
public Mono<String> ping() {
return this.requesterMono.flatMap(requester -> requester.route("pong")
.data("TEST")
.retrieveMono(String.class));
}
}
Updated for Spring Framework 5.2.6+
You could achieve it with io.rsocket.core.RSocketConnector#reconnect
.
@Bean
Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder rSocketRequesterBuilder) {
return rSocketRequesterBuilder
.rsocketConnector(connector -> connector
.reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
.connectTcp("localhost", 7000);
}
@RestController
public class RST {
@Autowired
private Mono<RSocketRequester> rSocketRequesterMono;
@GetMapping(path = "/ping")
public Mono<String> ping() {
return rSocketRequesterMono.flatMap(rSocketRequester ->
rSocketRequester.route("pong")
.data("TEST")
.retrieveMono(String.class)
.doOnNext(System.out::println));
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With