Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Definition of Akka HTTP client-side websocket streams

I've successfully used Akka Streams in the past, however, I'm currently having a hard time to understand why client-side Websocket Streams in Akka-HTTP are defined and work as it is shown in the documentation.

Since a WebSocket connection allows full-duplex communication, I'd expected that such a connection is represented by two separate streams in Akka HTTP, one for incoming traffic, one for outgoing traffic. And indeed, the documentation states the following:

A WebSocket consists of two streams of messages [...]

It further states that incoming messages are represented by a Sink and outgoing messages by a Source. And that's my first point of confusion - when using two separate streams, you'd expect having to deal with two sources and two sinks in total instead of just one of each kind. Currently, my guess is that the source of the incoming stream as well as the sink of the outgoing stream are not really of much use to the developer and therefore are just "hidden".

However, it really gets confusing when wiring everything together (see the docs linked above).

The part in question when using singleWebSocketRequest:

val flow: Flow[Message, Message, Future[Done]] =
      Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)

Or the same part when using webSocketClientFlow:

val (upgradeResponse, closed) =

This contradicts my current understanding of the workflow of streams.

  • Why would I want to combine the Source for outgoing messages and the Sink for incoming messages? The code above looks like I'm sending messages to myself and not to a server.
  • Furthermore, what are the semantics of Flow[Message, Message, ...]? Transforming outgoing messages to incoming messages doesn't seem to make sense.
  • Shouldn't there be two streams instead of one?

Any help in improving my understanding is appreciated, thanks.


I have no problems using the Source and Sink and sending data over the WebSocket, I just want to understand why the wiring of the stages is done like this.

like image 555
ceran Avatar asked Mar 08 '23 14:03


1 Answers

The WebSocket does consist of two separate streams, it's just that those streams are (likely) not on the same JVM.

You have two peers communicating, one being the server the other one the client, but from the point of an established WebSocket connection the difference doesn't matter anymore. One stream of data is peer 1 sending messages to peer 2, the other stream is peer 2 sending messages to peer 1 and then there is a network boundary between those two peers. If you look at it one peer at a time, you have peer 1 receiving messages from peer 2 and in the second stream peer 1 is sending messages to peer 2.

Each peer has a Sink for the receiving part and a Source for the sending part. You do actually have two Sources and two Sinks in total, just not both on the same ActorSystem (assuming for the sake of explanation that both peers are implemented in Akka HTTP). The Source from peer 1 is connected to the Sink of peer 2 and the Source of peer 2 is connected to the Sink of peer 1.

So, you write a Sink that describes how to deal with incoming messages over the first stream and a Source that describes how to send messages over the second stream. Often you want to produces messages based on the ones that you are receiving, so you can connect those two together and route the messages through different flows that describe how to react and incoming messages and produce any number of outgoing messages. The Flow[Message, Message, _] does not mean that you're transforming outgoing messages to incoming messages but rather incoming messages to outgoing messages.

The webSocketFlow is a typical asynchronous boundary, a flow that represent the other peer. It's "transforming" outgoing messages to incoming messages by sending them to the other peer and emitting whatever the other peer sends.

val flow: Flow[Message, Message, Future[Done]] =
      Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)

This flow is your peer's half of the two streams:

  1. [message from other peer] connected to printSink
  2. helloSource connected to [message to the other peer]

There is no relation between incoming messages and outgoing messages, you just print everything you receive and send a single "hello world!". Actually, since the source completes after one message, the WebSocket connection closes as well, but if you replace the Source with for example Source.repeat, you'd be constantly sending (flooding, really) "hello, world!" over the wire, regardless of the rate of incoming messages.

val (upgradeResponse, closed) =

Here you take everything coming from outgoing, which is the messages you want to send, route it through the webSocketFlow, which "transforms" the message by communicating with the other peer and produces every received message into incoming. Often you have a wire protocol where you encode and decode your case class/pojo/dto messages into and from the wire format.

val encode: Flow[T, Message, _] = ???
val decode: Flow[Message, T, _] = ???

val upgradeResponse = outgoing

Or you can imagine some kind chat server (ah, websockets and chats), which broadcasts and merges messages from and to a number of clients. This should take any message from any client and send it to every client (for demonstration only, untested and probably not what you would want for an actual chat server):

val chatClientReceivers: Seq[Sink[Message, NotUsed]] = ???
val chatClientSenders: Seq[Source[Message, NotUsed]] = ???

// each of those receivers/senders could be paired in their own websocket compatible flow
val chatSockets: Seq[Flow[Message, Message, NotUsed]] =
  (chatClientReceivers, chatClientSenders).zipped.map(
    (outgoingSendToClient, incomingFromClient) =>
      Flow.fromSinkAndSource(outgoingSendToClient, incomingFromClient))

val toClients: Graph[SinkShape[Message], NotUsed] =
  GraphDSL.create() {implicit b =>
    import GraphDSL.Implicits._

    val broadcast = b.add(Broadcast[Message](chatClientReceivers.size))

    (broadcast.outArray, chatClientReceivers).zipped
      .foreach((bcOut, client) => bcOut ~> b.add(client).in)


val fromClients: Graph[SourceShape[Message], NotUsed] =
  GraphDSL.create() {implicit b =>
    import GraphDSL.Implicits._

    val merge = b.add(Merge[Message](chatClientSenders.size))

    (merge.inSeq, chatClientSenders).zipped
      .foreach((mIn, client) => b.add(client).out ~> mIn)


val upgradeResponse: Future[WebSocketUpgradeResponse] =

Hope this helps a bit.

like image 165
knutwalker Avatar answered Mar 11 '23 04:03