We are trying to design a WebSocket server using spring webflux WebSocket implementation. The server has usual HTTP server operations e.g. create/fetch/update/fetchall
. Using WebSockets we were trying to expose one endpoint so the clients could leverage a single connection for all sort of operations, given WebSockets are meant for this purpose. Is it a right design with webflux and WebSockets?
We are starting a project which is going to use reactive web sockets from spring-webflux
. We need to build a reactive client library which can be used by consumers to connect to the server.
On the server, we get a request, read a message, save it and return a static response:
public Mono<Void> handle(WebSocketSession webSocketSession) {
Flux<WebSocketMessage> response = webSocketSession.receive()
.map(WebSocketMessage::retain)
.concatMap(webSocketMessage -> Mono.just(webSocketMessage)
.map(parseBinaryToEvent) //logic to get domain object
.flatMap(e -> service.save(e))
.thenReturn(webSocketSession.textMessage(SAVE_SUCCESSFUL))
);
return webSocketSession.send(response);
}
On the client, We want to make a call when someone calls save
method and return the response from server
.
public Mono<String> save(Event message) {
new ReactorNettyWebSocketClient().execute(uri, session -> {
session
.send(Mono.just(session.binaryMessage(formatEventToMessage)))
.then(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(System.out::println).then()); //how to return this to client
});
return null;
}
We are not sure, how to go about designing this. Ideally, we think there should be
1) client.execute
should be called only once and somehow hold the session
. The same session should be used to send data in subsequent calls.
2) How to return the response from the server which we get in session.receive
?
3) How about in case of fetch
when the response is huge(not just a static string but list of events) in session.receive
?
We are doing some research but we are unable to find proper resources for webflux-websocket-client documentation/implementation online. Any pointers on how to move ahead.
It is absolutely correct design, and it worths to save resources and use only a connection per client for all possible ops.
However, don't implement a wheel and use the Protocol which gives you all of these kinds of communications.
One of the options to do so is using RSocket-Java implementation of RSocket protocol. RSocket-Java is built on top of Project Reactor, so it naturally fits Spring WebFlux ecosystem.
Unfortunately, there is no featured integration with Spring ecosystem. Fortunately, I spent a couple of hours to provide a simple RSocket Spring Boot Starter that integrates Spring WebFlux with RSocket and exposes WebSocket RSocket server along with WebFlux Http server.
Basically, RSocket hides the complexity of implementing the same approach yourself. With RSocket we don't have to care about interaction model definition as a custom protocol and as an implementation in Java. RSocket does for us delivering of the data to a particular logical channel. It provides a built-in client that sends messages to the same WS connection, so we don't have to invent a custom implementation for that.
Since RSocket just a protocol it does not provide any message format, so this challenge is for business logic. However, there is an RSocket-RPC project which provides a Protocol Buffer as a message format and reuses the same code generation technique as GRPC does. So using RSocket-RPC we can easily build an API for the client and server and careless about transport and protocol abstraction at all.
The same RSocket Spring Boot integration provides an example of RSocket-RPC usage as well.
So, for that purpose, you have to implement that hell yourself. I have already done that once before, but I can't point to that project since it is an enterprise one. Nevertheless, I can share a couple of code samples that can help you in building a proper client and server.
The first point that must be taken into account is that all logical streams within one physical connection should be stored somewhere:
class MyWebSocketRouter implements WebSocketHandler {
final Map<String, EnumMap<ActionMessage.Type, ChannelHandler>> channelsMapping;
@Override
public Mono<Void> handle(WebSocketSession session) {
final Map<String, Disposable> channelsIdsToDisposableMap = new HashMap<>();
...
}
}
There are two maps in the sample above. The first one is your routes mapping which allows you to identify route based on the incoming message params, or so. The second one is created for request-streams usecase (in my case it was map of active subscriptions), so you can send a message-frame that creates a subscription, or subscribes you to a specific action and keep that subscription so once the unsubscribe action is executed you will be unsubscribed if a subscription exists.
In order to send back messages from all logical streams, you have to multiplex messages to one stream. For example, using Reactor, you can do that using UnicastProcessor
:
@Override
public Mono<Void> handle(WebSocketSession session) {
final UnicastProcessor<ResponseMessage<?>> funIn = UnicastProcessor.create(Queues.<ResponseMessage<?>>unboundedMultiproducer().get());
...
return Mono
.subscriberContext()
.flatMap(context -> Flux.merge(
session
.receive()
...
.cast(ActionMessage.class)
.publishOn(Schedulers.parallel())
.doOnNext(am -> {
switch (am.type) {
case CREATE:
case UPDATE:
case CANCEL: {
...
}
case SUBSCRIBE: {
Flux<ResponseMessage<?>> flux = Flux
.from(
channelsMapping.get(am.getChannelId())
.get(ActionMessage.Type.SUBSCRIBE)
.handle(am) // returns Publisher<>
);
if (flux != null) {
channelsIdsToDisposableMap.compute(
am.getChannelId() + am.getSymbol(), // you can generate a uniq uuid on the client side if needed
(cid, disposable) -> {
...
return flux
.subscriberContext(context)
.subscribe(
funIn::onNext, // send message to a Processor manually
e -> {
funIn.onNext(
new ResponseMessage<>( // send errors as a messages to Processor here
0,
e.getMessage(),
...
ResponseMessage.Type.ERROR
)
);
}
);
}
);
}
return;
}
case UNSABSCRIBE: {
Disposable disposable = channelsIdsToDisposableMap.get(am.getChannelId() + am.getSymbol());
if (disposable != null) {
disposable.dispose();
}
}
}
})
.then(Mono.empty()),
funIn
...
.map(p -> new WebSocketMessage(WebSocketMessage.Type.TEXT, p))
.as(session::send)
).then()
);
}
As we can see from the sample above, there is a bunch of things there:
Flux
of messages or just a Mono
(in case of mono it could be implemented simpler on the server side, so you don't have to keep unique stream ID).The client is not that simple as well:
To handle connection we have to allocate two processors so further we can use them to multiplex and demultiplex messages:
UnicastProcessor<> outgoing = ...
UnicastPorcessor<> incoming = ...
(session) -> {
return Flux.merge(
session.receive()
.subscribeWith(incoming)
.then(Mono.empty()),
session.send(outgoing)
).then();
}
All created streams whether it is Mono
or Flux
should be stored somewhere so we will be capable of distinguishing to which stream message relates:
Map<String, MonoSink> monoSinksMap = ...;
Map<String, FluxSink> fluxSinksMap = ...;
we have to keep two maps since MonoSink, and FluxSink does not have the same parent interface.
In the above samples, we just considered the initial part of the client side. Now we have to build a message routing mechanism:
...
.subscribeWith(incoming)
.doOnNext(message -> {
if (monoSinkMap.containsKey(message.getStreamId())) {
MonoSink sink = monoSinkMap.get(message.getStreamId());
monoSinkMap.remove(message.getStreamId());
if (message.getType() == SUCCESS) {
sink.success(message.getData());
}
else {
sink.error(message.getCause());
}
} else if (fluxSinkMap.containsKey(message.getStreamId())) {
FluxSink sink = fluxSinkMap.get(message.getStreamId());
if (message.getType() == NEXT) {
sink.next(message.getData());
}
else if (message.getType() == COMPLETE) {
fluxSinkMap.remove(message.getStreamId());
sink.next(message.getData());
sink.complete();
}
else {
fluxSinkMap.remove(message.getStreamId());
sink.error(message.getCause());
}
}
})
The above code sample shows how we can route incoming messages.
The final part is messages multiplexing. For that purpose we are going to cover possible sender class impl:
class Sender {
UnicastProcessor<> outgoing = ...
UnicastPorcessor<> incoming = ...
Map<String, MonoSink> monoSinksMap = ...;
Map<String, FluxSink> fluxSinksMap = ...;
public Sender () {
// create websocket connection here and put code mentioned earlier }
Mono<R> sendForMono(T data) {
//generate message with unique
return Mono.<R>create(sink -> {
monoSinksMap.put(streamId, sink);
outgoing.onNext(message); // send message to server only when subscribed to Mono
});
}
Flux<R> sendForFlux(T data) {
return Flux.<R>create(sink -> {
fluxSinksMap.put(streamId, sink);
outgoing.onNext(message); // send message to server only when subscribed to Flux
});
}
}
Not sure if is this case your problem?? im seeing that you are sending a static flux response (this is a close-able stream) you need a opend stream to send messages to that session for example you can create a processor
public class SocketMessageComponent {
private DirectProcessor<String> emitterProcessor;
private Flux<String> subscriber;
public SocketMessageComponent() {
emitterProcessor = DirectProcessor.create();
subscriber = emitterProcessor.share();
}
public Flux<String> getSubscriber() {
return subscriber;
}
public void sendMessage(String mesage) {
emitterProcessor.onNext(mesage);
}
}
and then you can send
public Mono<Void> handle(WebSocketSession webSocketSession) {
this.webSocketSession = webSocketSession;
return webSocketSession.send(socketMessageComponent.getSubscriber()
.map(webSocketSession::textMessage))
.and(webSocketSession.receive()
.map(WebSocketMessage::getPayloadAsText).log());
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With