Logo Questions Linux Laravel Mysql Ubuntu Git Menu

How to correctly emit values to Sink from multiple Fluxes (WebsocketSession::receive) in Spring WebFlux?

In my simplified case I want to broadcast a message sent by WebSocket client to all other clients. The application is built using reactive websockets with Spring.

My idea was to use single Sink and if a message is received from the client, emit it on this sink. WebsocketSession::send just forwards events emitted by this Sink to connected clients.

class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
        val output = session.send(sink.asFlux().map { message -> session.textMessage(toJson(message)) })

        return Mono.zip(input, output).then()

    fun toJson(obj : Any) : String = objectMapper.writeValueAsString(obj)

    fun <T> fromJson(json : String, clazz : Class<T>) : T{
        return objectMapper.readValue(json, clazz)


This implementation is not safe as Sink.emitNext can be called from different threads.

My attempt was to use publishOn and pass a singled threaded Scheduler so that onNext for all WebSocketSessions is called from a single thread. However this does not work. One item is emitted from a websocket client and then all subsequent websocket clients receive onClose event immediately after connection :

class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val scheduler = Schedulers.newSingle("sink-scheduler")

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .publishOn(scheduler) // publish on single threaded scheduler
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)


Another option which I could see is to synchronize on some common lock so that emission is thread safe :

class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val lock = Any()

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    synchronized(lock) {
                        sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)


However I am not sure if this should be done like that.

The question is

Is it possible to use publishOn in this case so that emission is thread safe and if not what is other solution to this problem (apart of using synchronization like I have done with synchronized keyword).

like image 920
Michał Krzywański Avatar asked Dec 07 '20 17:12

Michał Krzywański

People also ask

What is WebSocket in spring webflux?

In this spring webflux websocket example, Learn to create reactive applications using spring webflux which support websocket connection between a client and server. A websocket is a bi-directional, full-duplex, persistent connection between a web browser and a server.

What is the difference between flux and spring webflux?

A Flux can be endless, meaning that it can keep emitting elements forever. Also it can return a sequence of elements and then send a completion notification when it has returned all of its elements. In Spring WebFlux, we call reactive APIs/functions that return monos and fluxes and your controllers will return monos and fluxes.

Does webflux support Reactive Streams and WebSocket?

Other than Reactive RestController and WebClient, the WebFlux framework also supports reactive WebSocket and the corresponding WebSocketClient for socket style streaming of Reactive Streams. For more information, we also have a detailed article focused on working with Reactive WebSocket with Spring 5.

How to create an example using flux stream type with spring flux?

For creating an example using the Flux stream type with Spring Flux framework, you have to create a Spring Boot application. In my case I used Spring Initializr : You have to select as dependency "Reactive Web", generate the project and open it in your IDE tool. In my case, I used Spring Tool Suite. Your POM must look like this:

3 Answers

Instead of pessimistic locking with the synchronized option, you could create an EmitFailureHandler comparable to FAIL_FAST except it returns true for EmitResult.NON_SERIALIZED_ACCESS.

This would result in the concurrent emit attempts to be immediately retried, like in a busy loop.

Optimistically, this will end up succeeding. You can even make the custom handler introduce a delay or limit the number of times it returns true if you want to be extra defensive against infinite loops.

like image 125
Simon Baslé Avatar answered Oct 28 '22 03:10

Simon Baslé

In additon to the @simon-baslé answer here is the sample code (for srping-webflux). It will downstream the request to the subscriber and in case of Sinks.EmitResult.FAIL_NON_SERIALIZED response will retry. This is the Sinks.EmitFailureHandler definition:

private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult
            .equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) ? true : false;

Here are the controller which will handle the request:

public class RestController {

    private final Many<String> sink = Sinks.many().multicast().directBestEffort();
    private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult
            .equals(Sinks.EmitResult.FAIL_NON_SERIALIZED) ? true : false;
    public RestController(ServiceSubscriber serviceSubscriber) {

    @GetMapping(path = "/{id}")
    public Mono<ResponseEntity<Void>> getData(@PathVariable String id) {
        return Mono.fromCallable(() -> {
            sink.emitNext(id, emitFailureHandler);
            return ResponseEntity.ok().<Void>build();
like image 20
Triphon Penakov Avatar answered Oct 28 '22 03:10

Triphon Penakov

The publishOn a single threaded scheduler approach should work but you need to use the same scheduler instance for each ReactiveWebSocketHandler.

Can you rather combine all of the receive() Fluxes using a flatMap rather than using the Sink?

My own solution to this problem takes the busy spin approach suggested by Simon.

See my answer to a similar question.

like image 32
Neil Swingler Avatar answered Oct 28 '22 05:10

Neil Swingler