Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to keep connection open for all the time in websockets

Server code :

object EchoService {

  def route: Route = path("ws-echo") {
    get {
      handleWebSocketMessages(flow)
    }
  } ~ path("send-client") {
    get {
      sourceQueue.map(q => {
        println(s"Offering message from server")
        q.offer(BinaryMessage(ByteString("ta ta")))
      } )
      complete("Sent from server successfully")
    }
  }

  val (source, sourceQueue) = {
    val p = Promise[SourceQueue[Message]]
    val s = Source.queue[Message](100, OverflowStrategy.backpressure).mapMaterializedValue(m => {
      p.trySuccess(m)
      m
    })
    (s, p.future)
  }

  val flow =
    Flow.fromSinkAndSourceMat(Sink.ignore, source)(Keep.right)
}

Client Code :

object Client extends App {


  implicit val actorSystem = ActorSystem("akka-system")
  implicit val flowMaterializer = ActorMaterializer()

  val config = actorSystem.settings.config
  val interface = config.getString("app.interface")

  val port = config.getInt("app.port")


  // print each incoming strict text message
  val printSink: Sink[Message, Future[Done]] =
    Sink.foreach {
      case message: TextMessage.Strict =>
        println(message.text)

      case _ => println(s"received unknown message format")
    }

  val (source, sourceQueue) = {
    val p = Promise[SourceQueue[Message]]
    val s = Source.queue[Message](100, OverflowStrategy.backpressure).mapMaterializedValue(m => {
      p.trySuccess(m)
      m
    })
    (s, p.future)
  }

  val flow =
    Flow.fromSinkAndSourceMat(printSink, source)(Keep.right)


  val (upgradeResponse, sourceClosed) =
    Http().singleWebSocketRequest(WebSocketRequest("ws://localhost:8080/ws-echo"), flow)

  val connected = upgradeResponse.map { upgrade =>
    // just like a regular http request we can get 404 NotFound,
    // with a response body, that will be available from upgrade.response
    if (upgrade.response.status == StatusCodes.SwitchingProtocols || upgrade.response.status == StatusCodes.OK ) {
      Done
    } else {
      throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
    }
  }

  connected.onComplete(println)

}

when i hit http://localhost:8080/send-client i see messages coming to client but after a while if try to send to client again i don't see any messages on client side :s . I also tried source.concatMat(Source.maybe)(Keep.right) but no luck :(

Edit : I tested with js client, somehow connection/flow closed on server end , is there anyway to prevent this ? and how can i listen to this event while using akka-http websocket client :s

like image 431
invariant Avatar asked Jun 09 '16 03:06

invariant


People also ask

How does WebSocket keep connection open?

WebSocket uses HTTP as the initial transport mechanism, but keeps the TCP connection alive after the HTTP response is received so that it can be used for sending messages between client and server. WebSockets allow us to build “real-time” applications without the use of long-polling.

How long can you keep a WebSocket open?

However, the connection between a client and your WebSocket app closes when no traffic is sent between them for 60 seconds.

Is WebSocket always open?

1 WebSockets are open as long as you want and are not hackish (like long-polling and other alternatives).


1 Answers

Hi,

The reason why it does not keep connected is because by default all HTTP connections have idle-timeout on by default to keep the system from leaking connections if clients disappear without any signal.

One way to overcome this limitation (and actually my recommended approach) is to inject keep-alive messages on the client side (messages that the server otherwise ignore, but informs the underlying HTTP server that the connection is still live).

You can override the idle-timeouts in the HTTP server configuration to a larger value but I don't recommend that.

If you are using stream based clients, injecting heartbeats when necessary is as simple as calling keepAlive and providing it a time interval and a factory for the message you want to inject: http://doc.akka.io/api/akka/2.4.7/index.html#akka.stream.scaladsl.Flow@keepAliveU>:Out:FlowOps.this.Repr[U]

That combinator will make sure that no periods more than T will be silent as it will inject elements to keep this contract if necessary (and will not inject anything if there is enough background traffic)

-Endre

thank you Endre :) , working snippet ..

// on client side 

 val (source, sourceQueue) = {
    val p = Promise[SourceQueue[Message]]
    val s = Source.queue[Message](Int.MaxValue, OverflowStrategy.backpressure).mapMaterializedValue(m => {
      p.trySuccess(m)
      m
    }).keepAlive(FiniteDuration(1, TimeUnit.SECONDS), () => TextMessage.Strict("Heart Beat"))
    (s, p.future)
  }
like image 70
invariant Avatar answered Oct 11 '22 15:10

invariant