Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Webflux websocketclient, How to send multiple requests in same session[design client library]

TL;DR;

We are trying to design a WebSocket server using spring webflux WebSocket implementation. The server has usual HTTP server operations e.g. create/fetch/update/fetchall. Using WebSockets we were trying to expose one endpoint so the clients could leverage a single connection for all sort of operations, given WebSockets are meant for this purpose. Is it a right design with webflux and WebSockets?

Long Version

We are starting a project which is going to use reactive web sockets from spring-webflux. We need to build a reactive client library which can be used by consumers to connect to the server.

On the server, we get a request, read a message, save it and return a static response:

public Mono<Void> handle(WebSocketSession webSocketSession) {
    Flux<WebSocketMessage> response = webSocketSession.receive()
            .map(WebSocketMessage::retain)
            .concatMap(webSocketMessage -> Mono.just(webSocketMessage)
                    .map(parseBinaryToEvent) //logic to get domain object
                    .flatMap(e -> service.save(e))
                    .thenReturn(webSocketSession.textMessage(SAVE_SUCCESSFUL))
            );

    return webSocketSession.send(response);
}

On the client, We want to make a call when someone calls save method and return the response from server.

public Mono<String> save(Event message) {
    new ReactorNettyWebSocketClient().execute(uri, session -> {
      session
              .send(Mono.just(session.binaryMessage(formatEventToMessage)))
              .then(session.receive()
                      .map(WebSocketMessage::getPayloadAsText)
                      .doOnNext(System.out::println).then()); //how to return this to client
    });
    return null;
}

We are not sure, how to go about designing this. Ideally, we think there should be

1) client.execute should be called only once and somehow hold the session. The same session should be used to send data in subsequent calls.

2) How to return the response from the server which we get in session.receive?

3) How about in case of fetch when the response is huge(not just a static string but list of events) in session.receive?

We are doing some research but we are unable to find proper resources for webflux-websocket-client documentation/implementation online. Any pointers on how to move ahead.

like image 809
Mritunjay Avatar asked Dec 17 '18 09:12

Mritunjay


2 Answers

Please! Use RSocket!

It is absolutely correct design, and it worths to save resources and use only a connection per client for all possible ops.

However, don't implement a wheel and use the Protocol which gives you all of these kinds of communications.

  • RSocket has a request-response model which allows you to do the most common client-servert interaction today.
  • RSocket has a request-stream communication model so you can fulfill all your need and return a stream of events asynchronously reusing the same connection. RSocket does all maping of logical stream to phisical connection and back, so you will not feel the pain of doing that yourself.
  • RSocket has far more interaction models such as fire-and-forget and stream-stream which could be useful in case of sending a stream of data in both ways.

How to use RSocket in Spring

One of the options to do so is using RSocket-Java implementation of RSocket protocol. RSocket-Java is built on top of Project Reactor, so it naturally fits Spring WebFlux ecosystem.

Unfortunately, there is no featured integration with Spring ecosystem. Fortunately, I spent a couple of hours to provide a simple RSocket Spring Boot Starter that integrates Spring WebFlux with RSocket and exposes WebSocket RSocket server along with WebFlux Http server.

Why RSocket is a better approach?

Basically, RSocket hides the complexity of implementing the same approach yourself. With RSocket we don't have to care about interaction model definition as a custom protocol and as an implementation in Java. RSocket does for us delivering of the data to a particular logical channel. It provides a built-in client that sends messages to the same WS connection, so we don't have to invent a custom implementation for that.

Make it even better with RSocket-RPC

Since RSocket just a protocol it does not provide any message format, so this challenge is for business logic. However, there is an RSocket-RPC project which provides a Protocol Buffer as a message format and reuses the same code generation technique as GRPC does. So using RSocket-RPC we can easily build an API for the client and server and careless about transport and protocol abstraction at all.

The same RSocket Spring Boot integration provides an example of RSocket-RPC usage as well.

Alright, It has not convinced me, I wanna have a custom WebSocket server still

So, for that purpose, you have to implement that hell yourself. I have already done that once before, but I can't point to that project since it is an enterprise one. Nevertheless, I can share a couple of code samples that can help you in building a proper client and server.

Server side

Handler and Open logical Subscribers mapping

The first point that must be taken into account is that all logical streams within one physical connection should be stored somewhere:

class MyWebSocketRouter implements WebSocketHandler {

  final Map<String, EnumMap<ActionMessage.Type, ChannelHandler>> channelsMapping;


  @Override
  public Mono<Void> handle(WebSocketSession session) {
    final Map<String, Disposable> channelsIdsToDisposableMap = new HashMap<>();
    ...
  }
}

There are two maps in the sample above. The first one is your routes mapping which allows you to identify route based on the incoming message params, or so. The second one is created for request-streams usecase (in my case it was map of active subscriptions), so you can send a message-frame that creates a subscription, or subscribes you to a specific action and keep that subscription so once the unsubscribe action is executed you will be unsubscribed if a subscription exists.

Use Processor for messages multiplexing

In order to send back messages from all logical streams, you have to multiplex messages to one stream. For example, using Reactor, you can do that using UnicastProcessor:

@Override
public Mono<Void> handle(WebSocketSession session) {
  final UnicastProcessor<ResponseMessage<?>> funIn = UnicastProcessor.create(Queues.<ResponseMessage<?>>unboundedMultiproducer().get());
  ...

  return Mono
    .subscriberContext()
    .flatMap(context -> Flux.merge(
      session
        .receive()
        ...
        .cast(ActionMessage.class)
        .publishOn(Schedulers.parallel())
        .doOnNext(am -> {
          switch (am.type) {
            case CREATE:
            case UPDATE:
            case CANCEL: {
              ...
            }
            case SUBSCRIBE: {
              Flux<ResponseMessage<?>> flux = Flux
                .from(
                  channelsMapping.get(am.getChannelId())
                                 .get(ActionMessage.Type.SUBSCRIBE)
                                 .handle(am) // returns Publisher<>
                );

              if (flux != null) {
                channelsIdsToDisposableMap.compute(
                  am.getChannelId() + am.getSymbol(), // you can generate a uniq uuid on the client side if needed
                  (cid, disposable) -> {
                    ...

                    return flux
                      .subscriberContext(context)
                      .subscribe(
                        funIn::onNext, // send message to a Processor manually
                        e -> {
                          funIn.onNext(
                            new ResponseMessage<>( // send errors as a messages to Processor here
                              0,
                              e.getMessage(),
                              ...
                              ResponseMessage.Type.ERROR
                            )
                          );
                        }
                      );
                  }
                );
              }

              return;
            }
            case UNSABSCRIBE: {
              Disposable disposable = channelsIdsToDisposableMap.get(am.getChannelId() + am.getSymbol());

              if (disposable != null) {
                disposable.dispose();
              }
            }
          }
        })
        .then(Mono.empty()),

        funIn
            ...
            .map(p -> new WebSocketMessage(WebSocketMessage.Type.TEXT, p))
            .as(session::send)
      ).then()
    );
}

As we can see from the sample above, there is a bunch of things there:

  1. The message should include route info
  2. The message should include a unique stream id to which it relates.
  3. Separate Processor for message multiplexing where error should be a message as well
  4. Each channel should be stored somewhere, in this case all we have a simple use case where each message can provide a Flux of messages or just a Mono (in case of mono it could be implemented simpler on the server side, so you don't have to keep unique stream ID).
  5. This sample does not include messages encoding-decoding, so this challenge is left on you.

Client side

The client is not that simple as well:

Handle session

To handle connection we have to allocate two processors so further we can use them to multiplex and demultiplex messages:

UnicastProcessor<> outgoing = ...
UnicastPorcessor<> incoming = ...
(session) -> {
  return Flux.merge(
     session.receive()
            .subscribeWith(incoming)
            .then(Mono.empty()),
     session.send(outgoing)
  ).then();
}

Keep all logical streams somewhere

All created streams whether it is Mono or Flux should be stored somewhere so we will be capable of distinguishing to which stream message relates:

Map<String, MonoSink> monoSinksMap = ...;
Map<String, FluxSink> fluxSinksMap = ...;

we have to keep two maps since MonoSink, and FluxSink does not have the same parent interface.

Message Routing

In the above samples, we just considered the initial part of the client side. Now we have to build a message routing mechanism:

...
.subscribeWith(incoming)
.doOnNext(message -> {
    if (monoSinkMap.containsKey(message.getStreamId())) {
        MonoSink sink = monoSinkMap.get(message.getStreamId());
        monoSinkMap.remove(message.getStreamId());
        if (message.getType() == SUCCESS) {
            sink.success(message.getData());
        }
        else {
            sink.error(message.getCause());
        }
    } else if (fluxSinkMap.containsKey(message.getStreamId())) {
        FluxSink sink = fluxSinkMap.get(message.getStreamId());
        if (message.getType() == NEXT) {
            sink.next(message.getData());
        }
        else if (message.getType() == COMPLETE) {
            fluxSinkMap.remove(message.getStreamId());
            sink.next(message.getData());
            sink.complete();
        }
        else {
            fluxSinkMap.remove(message.getStreamId());
            sink.error(message.getCause());
        }
    }
})

The above code sample shows how we can route incoming messages.

Multiplex requests

The final part is messages multiplexing. For that purpose we are going to cover possible sender class impl:

class Sender {
    UnicastProcessor<> outgoing = ...
    UnicastPorcessor<> incoming = ...

    Map<String, MonoSink> monoSinksMap = ...;
    Map<String, FluxSink> fluxSinksMap = ...;

    public Sender () {

// create websocket connection here and put code mentioned earlier }

    Mono<R> sendForMono(T data) {
        //generate message with unique 
        return Mono.<R>create(sink -> {
            monoSinksMap.put(streamId, sink);
            outgoing.onNext(message); // send message to server only when subscribed to Mono
        });
    }

     Flux<R> sendForFlux(T data) {
         return Flux.<R>create(sink -> {
            fluxSinksMap.put(streamId, sink);
            outgoing.onNext(message); // send message to server only when subscribed to Flux
        });
     }
}

Sumup of Custom implementation

  1. Hardcore
  2. No Backpressure support implemented so that could be another challenge
  3. Easy to shoot yourself in the foot

Takeaways

  1. PLEASE, use RSocket, don't invent protocol yourself, it is HARD!!!
  2. To learn more about RSocket from Pivotal guys - https://www.youtube.com/watch?v=WVnAbv65uCU
  3. To learn more about RSocket from one of my talks - https://www.youtube.com/watch?v=XKMyj6arY2A
  4. There is a featured framework built on top of RSocket called Proteus - you might be interested in that - https://www.netifi.com/
  5. To learn more about Proteus from core developer of RSocket protocol - https://www.google.com/url?sa=t&source=web&rct=j&url=https://m.youtube.com/watch%3Fv%3D_rqQtkIeNIQ&ved=2ahUKEwjpyLTpsLzfAhXDDiwKHUUUA8gQt9IBMAR6BAgNEB8&usg=AOvVaw0B_VdOj42gjr0YrzLLUX1E
like image 135
Oleh Dokuka Avatar answered Oct 14 '22 17:10

Oleh Dokuka


Not sure if is this case your problem?? im seeing that you are sending a static flux response (this is a close-able stream) you need a opend stream to send messages to that session for example you can create a processor

public class SocketMessageComponent {
private DirectProcessor<String> emitterProcessor;
private Flux<String> subscriber;

public SocketMessageComponent() {
    emitterProcessor = DirectProcessor.create();
    subscriber = emitterProcessor.share();
}

public Flux<String> getSubscriber() {
    return subscriber;
}

public void sendMessage(String mesage) {
    emitterProcessor.onNext(mesage);
}

}

and then you can send

 public Mono<Void> handle(WebSocketSession webSocketSession) {
    this.webSocketSession = webSocketSession;
    return webSocketSession.send(socketMessageComponent.getSubscriber()
            .map(webSocketSession::textMessage))
            .and(webSocketSession.receive()
                    .map(WebSocketMessage::getPayloadAsText).log());
}
like image 21
Ricard Kollcaku Avatar answered Oct 14 '22 18:10

Ricard Kollcaku