In my scenario, a client sends "goodbye" websocket message and I need to close previously established connection at the server side.
From akka-http docs:
Closing connections is possible by cancelling the incoming connection Flow from your server logic (e.g. by connecting its downstream to a Sink.cancelled and its upstream to a Source.empty). It is also possible to shut down the server's socket by cancelling the IncomingConnection source connections.
But it's not clear to me how to do that taking into account that Sink
and Source
are set once when negotiating a new connection:
(get & path("ws")) {
optionalHeaderValueByType[UpgradeToWebsocket]() {
case Some(upgrade) ⇒
val connectionId = UUID()
complete(upgrade.handleMessagesWithSinkSource(sink, source))
case None ⇒
reject(ExpectedWebsocketRequestRejection)
}
}
close() The WebSocket. close() method closes the WebSocket connection or connection attempt, if any. If the connection is already CLOSED , this method does nothing.
If you are writing a server, you should make sure to send a close frame when the server closes a client connection. The normal TCP socket close method can sometimes be slow and cause applications to think the connection is still open even when it's not.
However, the connection between a client and your WebSocket app closes when no traffic is sent between them for 60 seconds.
WebSocket uses HTTP as the initial transport mechanism, but keeps the TCP connection alive after the HTTP response is received so that it can be used for sending messages between client and server.
HINT: This answer is based on akka-stream-experimental
version 2.0-M2
. The API may be slightly different in other versions.
An easy way to close the connection is by using a PushStage
:
import akka.stream.stage._
val closeClient = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]) = elem match {
case "goodbye" ⇒
// println("Connection closed")
ctx.finish()
case msg ⇒
ctx.push(msg)
}
}
Every element that is received at the client side or at the server side (and in general every element that goes through a Flow
) goes through such a Stage
component. In Akka, the full abstraction is called GraphStage
, more information can be found in the official documentation.
With a PushStage
we can watch concrete incoming elements for their value and than transform the context accordingly. In the example above, once the goodbye
message is received we finish the context otherwise we just forward the value through the push
method.
Now, we can connect the closeClient
component to an arbitrary flow through the transform
method:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.transform(() ⇒ closeClient)
.map(_ ⇒ StdIn.readLine("> "))
.map(_ + "\n")
.map(ByteString(_))
connection.join(flow).run()
The flow above receives a ByteString
and returns a ByteString
, which means it can be connected to connection
through the join
method. Inside of the flow we first convert the bytes to a string before we send them to closeClient
. If the PushStage
doesn't finish the stream, the element is forwarded in the stream, where it gets dropped and replaced by some input from stdin, which is then sent back over the wire. In case the stream is finished, all further stream processing steps after the stage component will be dropped - the stream is now closed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With