I have been working on a sample Ktor websocket application for some time. What I have right now is a desktop app hosting the websocket server and an android app acting as the client.
I know that the server can keep track of its clients. With that I can identify if particular client is connected or not. For instance, if the android app is closed when connected to the server, the server immediately understands that the connection is lost.
Here's a code snippet demonstrating the same,
routing {
webSocket("/endpoint") {
try {
// Connection established
} catch (e: Exception) {
// Exceptions
} finally {
// Connection is lost
}
}
}
When a client app loses internet, or is closed, the server instance of the socket connection immediately goes into the finally
block. With that we can identify that the connection with that client is lost.
Now what if the reverse happened? What if the server app lost internet or was closed? How can the client identify this as a connection loss?
I couldn't find anything related to this anywhere in the docs and here's the relevant client code that I have written,
webSocketSession = client.webSocketSession(
method = HttpMethod.Get,
host = hostUrl,
port = port,
path = "endpoint"
)
I couldn't find any callbacks that can be attached to the webSocketSession
object that can help with this. Even webSocketSession.send("message")
doesn't throw any errors. It just fails quietly.
Is there anyway the client can be notified when the server is down, failing that is it possible to get some sort of status from webSocketSession.send()
call ?
I would really need to understand when the server is down, so I can try and re-establish a connection from the client side.
Thanks.
To get notified that a connection to a server is lost you can catch the ClosedReceiveChannelException
while trying to receive messages:
val client = HttpClient(CIO) {
install(WebSockets) {}
}
client.ws("ws://localhost:6060/") {
try {
while (true) { incoming.receive() }
} catch (e: ClosedReceiveChannelException) {
// Connection is lost
}
}
I had a slightly different usecase were I wanted to collect a flow to be able to sendSerialized
while at the same time receiveSerialized
and emit received items in a flow and be able to reconnect if the clients connection gets interrupted.
I couldn't get it to work properly without wrapping both send/receiveSerialized calls in a coroutineScope
, otherwise the ClosedReceiveChannelException
would escape and cause the entire flow to fail and never recover.
Here's an example on how to achieve this:
@Serialized
data class MessageDto(
@SerialName(value = "id")
val id: String,
@SerialName(value = "message")
val message: String
)
class MessageService constructor(
private val httpClient: HttpClient
) {
private val sendMessageFlow: MutableSharedFlow<MessageDto> = MutableSharedFlow()
fun sendMessage(messageDto) {
sendMessageFlow.emit(messageDto)
}
fun messages(): Flow<MessageDto> = flow {
httpClient.wss(
method = HttpMethod.Get,
host = getHostname(),
port = getPort(),
path = "/v1/messages"
) {
coroutineScope {
sendMessageFlow
.conflate()
.map(::sendSerialized)
.launchIn(this)
while (true) {
emit(receiveDeserialized<MessageDto>())
}
}
}
}.retryWithBackoff()
.flowOn(Dispatchers.Default)
}
inline val defaultExceptions: Set<Class<out Exception>>
get() = setOf(
ClosedReceiveChannelException::class.java,
ConnectException::class.java,
UnresolvedAddressException::class.java
)
/**
* Retries a `Flow<T>` indefinitely with exponential backoff
*
* @param delay -- delay in seconds between retries
* @param delayAtMost -- maximum delay in seconds after which this value is used to delay
*/
public fun <T> Flow<T>.retryWithBackoff(
delay: Int = 1,
delayAtMost: Int = 10,
exceptions: Set<Class<out Exception>> = defaultExceptions
): Flow<T> {
var _delay = delay
return retryWhen { cause, attempt ->
val shouldRetry = exceptions.any { (cause::class.java == it) }
if (shouldRetry) {
delay(_delay.seconds.inWholeMilliseconds)
val next = _delay * 2
_delay = if (next < delayAtMost) {
next
} else delayAtMost
true
} else false
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With