interface WebSocketSession {
  val incoming: ReceiveChannel<String>
  val outgoing: SendChannel<String>
}

suspend fun <R> OkHttpClient.newWebSocket(
  request: Request,
  block: suspend WebSocketSession.() -> R
): R = coroutineScope {
  val incoming = Channel<String>()
  val webSocket = newWebSocket(
    request,
    object : WebSocketListener() {
      override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
        incoming.close(t)
      }

      override fun onMessage(webSocket: WebSocket, text: String) {
        incoming.sendBlocking(text)
      }

      override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
        incoming.close()
      }

      override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
        incoming.close()
      }
    }
  )

  val outgoing = Channel<String>()
  launch {
    try {
      outgoing.consumeEach { webSocket.send(it) }
    } finally {
      webSocket.close(1000, null)
    }
  }

  val session = object : WebSocketSession {
    override val incoming: ReceiveChannel<String> = incoming
    override val outgoing: SendChannel<String> = outgoing
  }

  outgoing.consume { block(session) }
}