I'm trying to create a simple Proxy for Websocket connections using Play and akka streams. The traffic flow is like this:
(Client) request -> -> request (Server)
Proxy
(Client) response <- <- response (Server)
I came up with the following code after following some examples:
def socket = WebSocket.accept[String, String] { request =>
val uuid = UUID.randomUUID().toString
// wsOut - actor that deals with incoming websocket frame from the Client
// wsIn - publisher of the frame for the Server
val (wsOut: ActorRef, wsIn: Publisher[String]) = {
val source: Source[String, ActorRef] = Source.actorRef[String](10, OverflowStrategy.dropTail)
val sink: Sink[String, Publisher[String]] = Sink.asPublisher(fanout = false)
source.toMat(sink)(Keep.both).run()
}
// sink that deals with the incoming messages from the Server
val serverIncoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println("The server has sent: " + message.text)
}
// source for sending a message over the WebSocket
val serverOutgoing = Source.fromPublisher(wsIn).map(TextMessage(_))
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://0.0.0.0:6000"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
serverOutgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(serverIncoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
val actor = system.actorOf(WebSocketProxyActor.props(wsOut, uuid))
val finalFlow = {
val sink = Sink.actorRef(actor, akka.actor.Status.Success(()))
val source = Source.maybe[String] // what the client receives. How to connect with the serverIncoming sink ???
Flow.fromSinkAndSource(sink, source)
}
finalFlow
With this code, the traffic goes from the Client to the Proxy to the Server, back to the Proxy and that's it. It doesn't reach further to the Client. How can I fix this ?
I think I need to somehow connect the serverIncoming
sink to the source
in the finalFlow
, but I can't figure out how to do it...
Or am I totally wrong with this approach ? Is it better to use a Bidiflow
or a Graph
? I'm new to akka streams and still trying to figure things out.
The following seems to work. Note: I've implemented both the server socket and the proxy socket in the same controller, but you can split them or deploy the same controller on separate instances. The ws url to the 'upper' service will need to be updated in both cases.
package controllers
import javax.inject._
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse}
import akka.stream.Materializer
import akka.stream.scaladsl.Flow
import play.api.libs.streams.ActorFlow
import play.api.mvc._
import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps
@Singleton
class SomeController @Inject()(implicit exec: ExecutionContext,
actorSystem: ActorSystem,
materializer: Materializer) extends Controller {
/*--- proxy ---*/
def websocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9000/upper-socket"))
def proxySocket: WebSocket = WebSocket.accept[String, String] { _ =>
Flow[String].map(s => TextMessage(s))
.via(websocketFlow)
.map(_.asTextMessage.getStrictText)
}
/*--- server ---*/
class UpperService(socket: ActorRef) extends Actor {
override def receive: Receive = {
case s: String => socket ! s.toUpperCase()
case _ =>
}
}
object UpperService {
def props(socket: ActorRef): Props = Props(new UpperService(socket))
}
def upperSocket: WebSocket = WebSocket.accept[String, String] { _ =>
ActorFlow.actorRef(out => UpperService.props(out))
}
}
You will need the routes to be set up like this:
GET /upper-socket controllers.SomeController.upperSocket
GET /proxy-socket controllers.SomeController.proxySocket
You can test by sending a string to ws://localhost:9000/proxy-socket. The answer will be the uppercased string.
There will be a timeout after 1 minute of inactivity though:
akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [localhost:9000], no bytes passed in the last 1 minute
But see: http://doc.akka.io/docs/akka-http/current/scala/http/common/timeouts.html on how to configure this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With