Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is Sinks.many().multicast().onBackpressureBuffer() completing after one of the subscribers cancels the subscription and how to avoid it

I have come across a behaviour I don't understand when using Sinks.Many<String> to notify some events to multiple subscribers:

fun main() {

    val sink : Sinks.Many<String>  = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux().log()

    val d = flux.subscribe {
        println("--> $it")
    }

    sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)

    val d2 = flux.subscribe {
        println("--2> $it")
    }

    sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}

This code shows the first subscriber getting the values 1 and 2, and the second subscriber getting 2. So far so good:

11:49:06.936 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.938 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--> 2
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--2> 2

Now, assume the first subscriber disposes (cancels) its subscription after the first emission, I was expecting the first subscriber to get 1 and the second to get 2:


    val sink : Sinks.Many<String>  = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux().log()

    val d = flux.subscribe {
        println("--> $it")
    }

    sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)

    d.dispose()

    val d2 = flux.subscribe {
        println("--2> $it")
    }

    sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)

}
11:51:48.684 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.685 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - cancel()
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.690 [main] INFO reactor.Flux.EmitterProcessor.1 - onComplete()

However, when the second subscriber tries to subscribe the flux is considered completed. Why is this happening? I need the Sinks.Many to be available at any moment to be subscribed and unsubscribed without cancelling.

like image 639
codependent Avatar asked Mar 17 '21 10:03

codependent


People also ask

What is Sinks many?

public static interface Sinks.Many<T> extends Scannable. A base interface for standalone Sinks with Flux semantics. The sink can be exposed to consuming code as a Flux via its asFlux() view. Author: Simon Baslé, Stephane Maldini.

What is reactor Sinks?

183 lines (130 sloc) 8.62 KB. Edit this file. E. In Reactor a sink is a class that allows safe manual triggering of signals in a standalone fashion, creating a Publisher -like structure capable of dealing with multiple Subscriber (with the exception of unicast() flavors).


1 Answers

I just hit the same issue.

It is caused by autoCancel defaulting to true. Unfortunately the onBackpressureBuffer javadoc makes no mention of it.

This behaviour is inherited from EmitterProcessor.create where it is documented.

To set the autoCancel flag to false it's necessary to use the alternative onBackpressureBuffer

Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
like image 170
Neil Swingler Avatar answered Oct 06 '22 11:10

Neil Swingler