Server code :
object EchoService {
def route: Route = path("ws-echo") {
get {
handleWebSocketMessages(flow)
}
} ~ path("send-client") {
get {
sourceQueue.map(q => {
println(s"Offering message from server")
q.offer(BinaryMessage(ByteString("ta ta")))
} )
complete("Sent from server successfully")
}
}
val (source, sourceQueue) = {
val p = Promise[SourceQueue[Message]]
val s = Source.queue[Message](100, OverflowStrategy.backpressure).mapMaterializedValue(m => {
p.trySuccess(m)
m
})
(s, p.future)
}
val flow =
Flow.fromSinkAndSourceMat(Sink.ignore, source)(Keep.right)
}
Client Code :
object Client extends App {
implicit val actorSystem = ActorSystem("akka-system")
implicit val flowMaterializer = ActorMaterializer()
val config = actorSystem.settings.config
val interface = config.getString("app.interface")
val port = config.getInt("app.port")
// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict =>
println(message.text)
case _ => println(s"received unknown message format")
}
val (source, sourceQueue) = {
val p = Promise[SourceQueue[Message]]
val s = Source.queue[Message](100, OverflowStrategy.backpressure).mapMaterializedValue(m => {
p.trySuccess(m)
m
})
(s, p.future)
}
val flow =
Flow.fromSinkAndSourceMat(printSink, source)(Keep.right)
val (upgradeResponse, sourceClosed) =
Http().singleWebSocketRequest(WebSocketRequest("ws://localhost:8080/ws-echo"), flow)
val connected = upgradeResponse.map { upgrade =>
// just like a regular http request we can get 404 NotFound,
// with a response body, that will be available from upgrade.response
if (upgrade.response.status == StatusCodes.SwitchingProtocols || upgrade.response.status == StatusCodes.OK ) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
connected.onComplete(println)
}
when i hit http://localhost:8080/send-client
i see messages coming to client but after a while if try to send to client again i don't see any messages on client side :s . I also tried source.concatMat(Source.maybe)(Keep.right)
but no luck :(
Edit : I tested with js client, somehow connection/flow closed on server end , is there anyway to prevent this ? and how can i listen to this event while using akka-http websocket client :s
WebSocket uses HTTP as the initial transport mechanism, but keeps the TCP connection alive after the HTTP response is received so that it can be used for sending messages between client and server. WebSockets allow us to build “real-time” applications without the use of long-polling.
However, the connection between a client and your WebSocket app closes when no traffic is sent between them for 60 seconds.
1 WebSockets are open as long as you want and are not hackish (like long-polling and other alternatives).
Hi,
The reason why it does not keep connected is because by default all HTTP connections have idle-timeout on by default to keep the system from leaking connections if clients disappear without any signal.
One way to overcome this limitation (and actually my recommended approach) is to inject keep-alive messages on the client side (messages that the server otherwise ignore, but informs the underlying HTTP server that the connection is still live).
You can override the idle-timeouts in the HTTP server configuration to a larger value but I don't recommend that.
If you are using stream based clients, injecting heartbeats when necessary is as simple as calling
keepAlive
and providing it a time interval and a factory for the message you want to inject: http://doc.akka.io/api/akka/2.4.7/index.html#akka.stream.scaladsl.Flow@keepAliveU>:Out:FlowOps.this.Repr[U]That combinator will make sure that no periods more than T will be silent as it will inject elements to keep this contract if necessary (and will not inject anything if there is enough background traffic)
-Endre
thank you Endre :) , working snippet ..
// on client side
val (source, sourceQueue) = {
val p = Promise[SourceQueue[Message]]
val s = Source.queue[Message](Int.MaxValue, OverflowStrategy.backpressure).mapMaterializedValue(m => {
p.trySuccess(m)
m
}).keepAlive(FiniteDuration(1, TimeUnit.SECONDS), () => TextMessage.Strict("Heart Beat"))
(s, p.future)
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With