interface WebSocketSession {
val incoming: ReceiveChannel<String>
val outgoing: SendChannel<String>
}
suspend fun <R> OkHttpClient.newWebSocket(
request: Request,
block: suspend WebSocketSession.() -> R
): R = coroutineScope {
val incoming = Channel<String>()
val webSocket = newWebSocket(
request,
object : WebSocketListener() {
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
incoming.close(t)
}
override fun onMessage(webSocket: WebSocket, text: String) {
incoming.sendBlocking(text)
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
incoming.close()
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
incoming.close()
}
}
)
val outgoing = Channel<String>()
launch {
try {
outgoing.consumeEach { webSocket.send(it) }
} finally {
webSocket.close(1000, null)
}
}
val session = object : WebSocketSession {
override val incoming: ReceiveChannel<String> = incoming
override val outgoing: SendChannel<String> = outgoing
}
outgoing.consume { block(session) }
}