Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Http Websocket as Akka Stream Source

I'd like to listen on a websocket using akka streams. That is, I'd like to treat it as nothing but a Source.

However, all official examples treat the websocket connection as a Flow.

My current approach is using the websocketClientFlow in combination with a Source.maybe. This eventually results in the upstream failing due to a TcpIdleTimeoutException, when there are no new Messages being sent down the stream.

Therefore, my question is twofold:

  1. Is there a way – which I obviously missed – to treat a websocket as just a Source?
  2. If using the Flow is the only option, how does one handle the TcpIdleTimeoutException properly? The exception can not be handled by providing a stream supervision strategy. Restarting the source by using a RestartSource doesn't help either, because the source is not the problem.

Update

So I tried two different approaches, setting the idle timeout to 1 second for convenience

application.conf

akka.http.client.idle-timeout = 1s

Using keepAlive (as suggested by Stefano)

Source.<Message>maybe()
    .keepAlive(Duration.apply(1, "second"), () -> (Message) TextMessage.create("keepalive"))
    .viaMat(Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri)), Keep.right())
    { ... }

When doing this, the Upstream still fails with a TcpIdleTimeoutException.

Using RestartFlow

However, I found out about this approach, using a RestartFlow:

final Flow<Message, Message, NotUsed> restartWebsocketFlow = RestartFlow.withBackoff(
    Duration.apply(3, TimeUnit.SECONDS),
    Duration.apply(30, TimeUnit.SECONDS),
    0.2,
    () -> createWebsocketFlow(system, websocketUri)
);

Source.<Message>maybe()
    .viaMat(restartWebsocketFlow, Keep.right()) // One can treat this part of the resulting graph as a `Source<Message, NotUsed>`
    { ... }

(...)

private Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> createWebsocketFlow(final ActorSystem system, final String websocketUri) {
    return Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri));
}

This works in that I can treat the websocket as a Source (although artifically, as explained by Stefano) and keep the tcp connection alive by restarting the websocketClientFlow whenever an Exception occurs.

This doesn't feel like the optimal solution though.

like image 449
Rea Sand Avatar asked Jan 08 '18 15:01

Rea Sand


1 Answers

  1. No. WebSocket is a bidirectional channel, and Akka-HTTP therefore models it as a Flow. If in your specific case you care only about one side of the channel, it's up to you to form a Flow with a "muted" side, by using either Flow.fromSinkAndSource(Sink.ignore, mySource) or Flow.fromSinkAndSource(mySink, Source.maybe), depending on the case.

  2. as per the documentation:

    Inactive WebSocket connections will be dropped according to the idle-timeout settings. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.

    There is an ad-hoc combinator to inject keep-alive messages, see the example below and this Akka cookbook recipe. NB: this should happen on the client side.

    src.keepAlive(1.second, () => TextMessage.Strict("ping"))

like image 51
Stefano Bonetti Avatar answered Nov 02 '22 02:11

Stefano Bonetti