Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Akka-HTTP client websocket send message

I'm trying client-side websocket by following doc at webSocketClientFlow.

sample code is:

import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {
  def main(args: Array[String]) = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    // Future[Done] is the materialized value of Sink.foreach,
    // emitted when the stream completes
    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
      }

    // send this as a message over the WebSocket
    val outgoing = Source.single(TextMessage("hello world!"))

    // flow to use (note: not re-usable!)
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

after had connection upgraded, how to use the connection send message to websocket server side?

I noticed from the doc:

The Flow that is returned by this method can only be materialized once. For each request a new flow must be acquired by calling the method again.

still confused, why we need construct the flow many times since an upgraded connection alrady ready.

like image 941
xring Avatar asked Oct 31 '16 15:10

xring


1 Answers

You can create an actor based source and send new messages over the established websocket connection.

    val req = WebSocketRequest(uri = "ws://127.0.0.1/ws")
    val webSocketFlow = Http().webSocketClientFlow(req)

    val messageSource: Source[Message, ActorRef] = 
         Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)

    val messageSink: Sink[Message, NotUsed] =
        Flow[Message]
            .map(message => println(s"Received text message: [$message]"))
            .to(Sink.ignore)

    val ((ws, upgradeResponse), closed) =
        messageSource
            .viaMat(webSocketFlow)(Keep.both)
            .toMat(messageSink)(Keep.both)
            .run()

    val connected = upgradeResponse.flatMap { upgrade =>
        if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
            Future.successful(Done)
        } else {
            throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
        }
    }

    ws ! TextMessage.Strict("Hello World")
    ws ! TextMessage.Strict("Hi")
    ws ! TextMessage.Strict("Yay!")

`

like image 122
dimart.sp Avatar answered Nov 15 '22 06:11

dimart.sp