I am wrapping my head around Flux Sinks and cannot understand the higher-level picture. When using Sinks.Many<T> tryEmitNext
, the function tells me if there was contention and what should I do in case of failure, (FailFast/Handler).
But is there a simple construct which allows me to safely emit elements from multiple threads. For example, instead of letting the user know that there was contention and I should try again, maybe add elements to a queue(mpmc, mpsc etc), and only notify when the queue is full.
Now I can add a queue myself to alleviate the problem, but it seems a common use case. I guess I am missing a point here.
I hit the same issue, migrating from Processors which support safe emission from multiple threads. I use this custom EmitFailureHandler to do a busy loop as suggested by the EmitFailureHandler docs.
public static EmitFailureHandler etryOnNonSerializedElse(EmitFailureHandler fallback){
return (signalType, emitResult) -> {
if (emitResult == EmitResult.FAIL_NON_SERIALIZED) {
LockSupport.parkNanos(10);
return true;
} else
return fallback.onEmitFailure(signalType, emitResult);
};
}
There are various confusing aspects about the 3.4.0 implementation
I hope there will be a solidly engineered alternative to this offered by the library at some point.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With