In production, our application generated the following stacktrace. There's none of our own code here - not really much to go on.
Can someone who understands the akka-stream library explain:
Play version: 2.8.18 Akka version: 2.6.20
2023-11-06 09:20:05 GMT [ERROR] p.c.s.c.WebSocketFlowHandler [WebSocketFlowHandler.scala:249] - WebSocket flow threw exception
java.lang.IllegalStateException: Sink.asPublisher(fanout = false) only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.11)
at akka.stream.impl.ReactiveStreamsCompliance$.rejectAdditionalSubscriber(ReactiveStreamsCompliance.scala:62)
at akka.stream.impl.VirtualPublisher.rec$6(StreamLayout.scala:481)
at akka.stream.impl.VirtualPublisher.subscribe(StreamLayout.scala:486)
at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.preStart(ActorGraphInterpreter.scala:148)
at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:306)
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:619)
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:727)
at akka.stream.impl.fusing.ActorGraphInterpreter.finishShellRegistration(ActorGraphInterpreter.scala:770)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:788)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:819)
Without seeing your code (ie, what you're trying to achieve), it's hard to know what's going wrong. But what it indicates is that somewhere, you've taken a Publisher produced by Sink.asPublisher, and tried to subscribe to it twice. This is not possible, since the publisher returned by Sink.asPublisher represents one running instance of a stream, and there's no way to restart that stream for a second subscriber if you subscribe a second time.
Conceptually thinking, when you receive a WebSocket connection, and you want to consume the incoming stream of messages, you can supply a sink to do that. Sink.asPublisher turns that stream into a Reactive Streams publisher that you can consume messages from with other reactive streams libraries. When you subscribe to it, the supplied subscriber will be connected to the existing WebSocket connection. But if you try to subscribe from that publisher a second time, what can it do? The WebSocket server can't go back to the WebSocket client and say "hey, can you connect to me again so I can have another stream of messages to send to this new subscriber?" No, it can only fail, saying that you can't subscribe twice.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With