So here is my websocket server implementation.
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) }
.via(chatActorFlow(UUID.randomUUID()))
.map(event => TextMessage.Strict(protocol.serialize(event)))
def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = {
val sink = Flow[Protocol.Message]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
val source = Source
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}
Flow.fromSinkAndSource(sink, source)
}
I'm wondering if there is any way to close connection once message of type ConnectionClosed
is sent by chatRef
?
close() The WebSocket. close() method closes the WebSocket connection or connection attempt, if any. If the connection is already CLOSED , this method does nothing.
WebSocket is a bidirectional communication protocol that can send the data from the client to the server or from the server to the client by reusing the established connection channel. The connection is kept alive until terminated by either the client or the server.
However, the connection between a client and your WebSocket app closes when no traffic is sent between them for 60 seconds.
Closing a WebSocket connection — The WebSocket Close Handshake. To close a WebSocket connection, a closing frame is sent (opcode 0x08 ). In addition to the opcode, the close frame may contain a body that indicates the reason for closing.
The solution below allows to drop connections from the server side by terminating the Actor materialized by the Source.actorRef
stage. This is simply done by sending a PoisonPill
to it.
Now, it is still not clear to me how you'd like to identify a "banned" client at connection time, so the example is - on purpose - very simple: the server drops any connection after a maximum amount of clients are connected. If you want to use any other strategy to kick out clients at any time, you can still apply the same logic and send PoisonPill
to their own source actors.
object ChatApp extends App {
implicit val system = ActorSystem("chat")
implicit val executor: ExecutionContextExecutor = system.dispatcher
implicit val materializer = ActorMaterializer()
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
val maximumClients = 1
class ChatRef extends Actor {
override def receive: Receive = withClients(Map.empty[UUID, ActorRef])
def withClients(clients: Map[UUID, ActorRef]): Receive = {
case SignedMessage(uuid, msg) => clients.collect{
case (id, ar) if id == uuid => ar ! msg
}
case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill
case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar)))
case CloseConnection(uuid) => context.become(withClients(clients - uuid))
}
}
object Protocol {
case class SignedMessage(uuid: UUID, msg: String)
case class OpenConnection(actor: ActorRef, uuid: UUID)
case class CloseConnection(uuid: UUID)
}
val chatRef = system.actorOf(Props[ChatRef])
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.mapAsync(1) {
case TextMessage.Strict(s) => Future.successful(s)
case TextMessage.Streamed(s) => s.runFold("")(_ + _)
case b: BinaryMessage => throw new Exception("Binary message cannot be handled")
}.via(chatActorFlow(UUID.randomUUID()))
.map(TextMessage(_))
def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = {
val sink = Flow[String]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
val source = Source.actorRef(16, OverflowStrategy.fail)
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}
Flow.fromSinkAndSource(sink, source)
}
Http().bindAndHandle(route, "0.0.0.0", 8080)
.map(_ => println(s"Started server..."))
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With