Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

IllegalStateException - Sink.asPublisher only supports one subscribe - when using WebSockets

In production, our application generated the following stacktrace. There's none of our own code here - not really much to go on.

Can someone who understands the akka-stream library explain:

  1. What's happening here?
  2. Is this just an underlying bug in Play. We've not reproduced it in our fairly similar test environment so it's not practical to generate a test case as required by the PlayFramework GitHub.

Play version: 2.8.18 Akka version: 2.6.20

2023-11-06 09:20:05 GMT [ERROR] p.c.s.c.WebSocketFlowHandler [WebSocketFlowHandler.scala:249] - WebSocket flow threw exception
java.lang.IllegalStateException: Sink.asPublisher(fanout = false) only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.11)
        at akka.stream.impl.ReactiveStreamsCompliance$.rejectAdditionalSubscriber(ReactiveStreamsCompliance.scala:62)
        at akka.stream.impl.VirtualPublisher.rec$6(StreamLayout.scala:481)
        at akka.stream.impl.VirtualPublisher.subscribe(StreamLayout.scala:486)
        at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.preStart(ActorGraphInterpreter.scala:148)
        at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:306)
        at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:619)
        at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:727)
        at akka.stream.impl.fusing.ActorGraphInterpreter.finishShellRegistration(ActorGraphInterpreter.scala:770)
        at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:788)
        at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:819)
like image 762
Philip Whitehouse Avatar asked Dec 20 '25 14:12

Philip Whitehouse


1 Answers

Without seeing your code (ie, what you're trying to achieve), it's hard to know what's going wrong. But what it indicates is that somewhere, you've taken a Publisher produced by Sink.asPublisher, and tried to subscribe to it twice. This is not possible, since the publisher returned by Sink.asPublisher represents one running instance of a stream, and there's no way to restart that stream for a second subscriber if you subscribe a second time.

Conceptually thinking, when you receive a WebSocket connection, and you want to consume the incoming stream of messages, you can supply a sink to do that. Sink.asPublisher turns that stream into a Reactive Streams publisher that you can consume messages from with other reactive streams libraries. When you subscribe to it, the supplied subscriber will be connected to the existing WebSocket connection. But if you try to subscribe from that publisher a second time, what can it do? The WebSocket server can't go back to the WebSocket client and say "hey, can you connect to me again so I can have another stream of messages to send to this new subscriber?" No, it can only fail, saying that you can't subscribe twice.

like image 148
James Roper Avatar answered Dec 24 '25 11:12

James Roper



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!