I'd like to listen on a websocket using akka streams. That is, I'd like to treat it as nothing but a Source
.
However, all official examples treat the websocket connection as a Flow
.
My current approach is using the websocketClientFlow
in combination with a Source.maybe
. This eventually results in the upstream failing due to a TcpIdleTimeoutException
, when there are no new Message
s being sent down the stream.
Therefore, my question is twofold:
Source
?Flow
is the only option, how does one handle the TcpIdleTimeoutException
properly? The exception can not be handled by providing a stream supervision strategy. Restarting the source by using a RestartSource
doesn't help either, because the source is not the problem.So I tried two different approaches, setting the idle timeout to 1 second for convenience
application.conf
akka.http.client.idle-timeout = 1s
Source.<Message>maybe()
.keepAlive(Duration.apply(1, "second"), () -> (Message) TextMessage.create("keepalive"))
.viaMat(Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri)), Keep.right())
{ ... }
When doing this, the Upstream still fails with a TcpIdleTimeoutException
.
However, I found out about this approach, using a RestartFlow
:
final Flow<Message, Message, NotUsed> restartWebsocketFlow = RestartFlow.withBackoff(
Duration.apply(3, TimeUnit.SECONDS),
Duration.apply(30, TimeUnit.SECONDS),
0.2,
() -> createWebsocketFlow(system, websocketUri)
);
Source.<Message>maybe()
.viaMat(restartWebsocketFlow, Keep.right()) // One can treat this part of the resulting graph as a `Source<Message, NotUsed>`
{ ... }
(...)
private Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> createWebsocketFlow(final ActorSystem system, final String websocketUri) {
return Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri));
}
This works in that I can treat the websocket as a Source (although artifically, as explained by Stefano) and keep the tcp connection alive by restarting the websocketClientFlow
whenever an Exception
occurs.
This doesn't feel like the optimal solution though.
No. WebSocket is a bidirectional channel, and Akka-HTTP therefore models it as a Flow
. If in your specific case you care only about one side of the channel, it's up to you to form a Flow
with a "muted" side, by using either Flow.fromSinkAndSource(Sink.ignore, mySource)
or Flow.fromSinkAndSource(mySink, Source.maybe)
, depending on the case.
as per the documentation:
Inactive WebSocket connections will be dropped according to the idle-timeout settings. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.
There is an ad-hoc combinator to inject keep-alive messages, see the example below and this Akka cookbook recipe. NB: this should happen on the client side.
src.keepAlive(1.second, () => TextMessage.Strict("ping"))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With