Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka websocket - how to close connection by server?

So here is my websocket server implementation.

val route = get {
  pathEndOrSingleSlash {
    handleWebSocketMessages(websocketFlow)
  }
}

def websocketFlow: Flow[Message, Message, Any] =
  Flow[Message]
    .collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) }
    .via(chatActorFlow(UUID.randomUUID()))
    .map(event => TextMessage.Strict(protocol.serialize(event)))


def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = {

  val sink = Flow[Protocol.Message]
    .map(msg => Protocol.SignedMessage(connectionId, msg))
    .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))

  val source = Source
      .mapMaterializedValue {
        actor : ActorRef => {
          chatRef ! Protocol.OpenConnection(actor, connectionId)
        }
      }

  Flow.fromSinkAndSource(sink, source)
}

I'm wondering if there is any way to close connection once message of type ConnectionClosed is sent by chatRef?

like image 272
Norbert Orzechowicz Avatar asked Dec 24 '16 19:12

Norbert Orzechowicz


People also ask

How do I close WebSocket connection?

close() The WebSocket. close() method closes the WebSocket connection or connection attempt, if any. If the connection is already CLOSED , this method does nothing.

Does WebSocket keep connection open?

WebSocket is a bidirectional communication protocol that can send the data from the client to the server or from the server to the client by reusing the established connection channel. The connection is kept alive until terminated by either the client or the server.

How long does WebSocket connection stay open?

However, the connection between a client and your WebSocket app closes when no traffic is sent between them for 60 seconds.

What is close frame in WebSocket?

Closing a WebSocket connection — The WebSocket Close Handshake. To close a WebSocket connection, a closing frame is sent (opcode 0x08 ). In addition to the opcode, the close frame may contain a body that indicates the reason for closing.


1 Answers

The solution below allows to drop connections from the server side by terminating the Actor materialized by the Source.actorRef stage. This is simply done by sending a PoisonPill to it.

Now, it is still not clear to me how you'd like to identify a "banned" client at connection time, so the example is - on purpose - very simple: the server drops any connection after a maximum amount of clients are connected. If you want to use any other strategy to kick out clients at any time, you can still apply the same logic and send PoisonPill to their own source actors.

object ChatApp extends App {

  implicit val system = ActorSystem("chat")
  implicit val executor: ExecutionContextExecutor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val route = get {
    pathEndOrSingleSlash {
      handleWebSocketMessages(websocketFlow)
    }
  }

  val maximumClients = 1

  class ChatRef extends Actor {
    override def receive: Receive = withClients(Map.empty[UUID, ActorRef])

    def withClients(clients: Map[UUID, ActorRef]): Receive = {
      case SignedMessage(uuid, msg) => clients.collect{
        case (id, ar) if id == uuid => ar ! msg
      }
      case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill
      case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar)))
      case CloseConnection(uuid) => context.become(withClients(clients - uuid))
    }
  }

  object Protocol {
    case class SignedMessage(uuid: UUID, msg: String)
    case class OpenConnection(actor: ActorRef, uuid: UUID)
    case class CloseConnection(uuid: UUID)
  }

  val chatRef = system.actorOf(Props[ChatRef])

  def websocketFlow: Flow[Message, Message, Any] =
    Flow[Message]
      .mapAsync(1) {
        case TextMessage.Strict(s) => Future.successful(s)
        case TextMessage.Streamed(s) => s.runFold("")(_ + _)
        case b: BinaryMessage => throw new Exception("Binary message cannot be handled")
      }.via(chatActorFlow(UUID.randomUUID()))
      .map(TextMessage(_))

  def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = {

    val sink = Flow[String]
      .map(msg => Protocol.SignedMessage(connectionId, msg))
      .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))

    val source = Source.actorRef(16, OverflowStrategy.fail)
      .mapMaterializedValue {
        actor : ActorRef => {
          chatRef ! Protocol.OpenConnection(actor, connectionId)
        }
      }

    Flow.fromSinkAndSource(sink, source)
  }

  Http().bindAndHandle(route, "0.0.0.0", 8080)
    .map(_ => println(s"Started server..."))

}
like image 118
Stefano Bonetti Avatar answered Nov 08 '22 13:11

Stefano Bonetti