I have come across a behaviour I don't understand when using Sinks.Many<String>
to notify some events to multiple subscribers:
fun main() {
val sink : Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
val flux = sink.asFlux().log()
val d = flux.subscribe {
println("--> $it")
}
sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)
val d2 = flux.subscribe {
println("--2> $it")
}
sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
This code shows the first subscriber getting the values 1 and 2, and the second subscriber getting 2. So far so good:
11:49:06.936 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.938 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--> 2
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--2> 2
Now, assume the first subscriber disposes (cancels) its subscription after the first emission, I was expecting the first subscriber to get 1 and the second to get 2:
val sink : Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
val flux = sink.asFlux().log()
val d = flux.subscribe {
println("--> $it")
}
sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)
d.dispose()
val d2 = flux.subscribe {
println("--2> $it")
}
sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}
11:51:48.684 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.685 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - cancel()
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.690 [main] INFO reactor.Flux.EmitterProcessor.1 - onComplete()
However, when the second subscriber tries to subscribe the flux is considered completed. Why is this happening? I need the Sinks.Many to be available at any moment to be subscribed and unsubscribed without cancelling.
public static interface Sinks.Many<T> extends Scannable. A base interface for standalone Sinks with Flux semantics. The sink can be exposed to consuming code as a Flux via its asFlux() view. Author: Simon Baslé, Stephane Maldini.
183 lines (130 sloc) 8.62 KB. Edit this file. E. In Reactor a sink is a class that allows safe manual triggering of signals in a standalone fashion, creating a Publisher -like structure capable of dealing with multiple Subscriber (with the exception of unicast() flavors).
I just hit the same issue.
It is caused by autoCancel defaulting to true. Unfortunately the onBackpressureBuffer javadoc makes no mention of it.
This behaviour is inherited from EmitterProcessor.create where it is documented.
To set the autoCancel flag to false it's necessary to use the alternative onBackpressureBuffer
Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With