Spring에서 코루틴으로 http2 전이중 통신 구현
http2(이하 h2)는 기존과 달리 헤더 압축과 프레임 단위 전송을 도입해 성능을 크게 개선했다. 모든 데이터는 하나의 TCP 연결 위에서 다중 스트림으로 교차 처리되며, 서버는 요청을 기다리지 않고 준비된 응답을 먼저 보낼 수 있다. 이로써 클라이언트와 서버는 서로 독립적으로 데이터를 주고받는 전이중 통신을 실현한다. 스프링프레임웍은 MVC로는 이러한 전이중(duplex)의 구조를 활용할 수 없고 웹플럭스기반의 아키텍쳐에서만 사용할 수 있다. h2의 대표적인 기능 중엔 널리 알려진 SSE가 있다. 이는 http1.1의 롱폴링(long polling)과는 다르다. 확보된 TCP세션안에서 필요할 때 프레임을 전송하는 기술이다. 하지만 SSE조차 그저 응답시의 프레임을 사용할 뿐이다. 진정한 전이중은 요청과 응답을 별도로 처리할 수 있어야한다.
@Controller
class List(val db:DatabaseClient){
data class Request(val page:Int, val search:String)
@GetMapping("/list")
fun list(httpReq:ServerHttpRequest, httpRes:ServerHttpResponse):Flow<String> {
val request = JSON.parse(Request::class.java, httpReq.body)
return DTOList(db, request)
}
}
이 간단한 컨트롤러는 Flow
를 반환하고 이는 스프링 내부에서 SSE로 처리된다. 하지만 이런 고수준의 스프링 추상화는 전이중을 사용할 수 없는 구조로 만들어진다.
전이중이 코드로 표현된다면 요청과 응답이 완전히 다르게 작동해야한다. 위의 코드처럼 순차적인 동작으로 요청 처리 후 응답 이라는 구조 하에서는 성립하지 않는 것이다. 독립적인 동작을 위해 콜백 형태로 개편하여 요청과 응답이 완전히 독립적으로 동작하게 처리하자.
스프링 플럭스는 컨트롤러의 응답에 따라 해당 컨트롤러를 어떻게 처리할지 분기한다. 만약 Mono를 반환한다면 스프링은 컨트롤러에 아무런 조치를 취하지 않고 그저 컨트롤러 코드 그대로를 실행한다. 네띠(Netty)등의 저수준 통신을 제어해보자.
@Controller
class List(val db:DatabaseClient){
data class Request(val page:Int, val search:String)
@GetMapping("/list")
fun list(httpReq:ServerHttpRequest, httpRes:ServerHttpResponse):Mono<Void>{
return httpReq.body.filter{it.readableByteCount() > 0}.concatMap{dataBuffer->
Mono.fromCallable {
val byteBody: ByteArray = ByteArray(dataBuffer.readableByteCount()).also { dataBuffer.read(it) }
DataBufferUtils.release(dataBuffer)
val stringBody = String(byte, charset)
println("request: $stringBody")
}
}.then()
}
}
저수준 요청처리는 요청의 body
를 구독하는 것으로 이뤄진다. 이 Flux
는 매번 청크바이트를 도착할 때마다 준다. concatMap
은 순서를 보장하면서도 비동기 처리를 추가로 할 수 있으므로 무난한 선택이다.
하지만 스프링이 기본으로 제공하는 많은 기능을 생략하고 있으므로 직접 구현해야 한다. 대표적인 저수준 처리는 다음과 같다.
- 각 청크별 UTF8 서로게이트 쌍(surrogate pair) 깨짐처리
- 요청 헤더의 ContentType과 Method에 따른 별도의 처리
본 장에서는 이런 상세 구현은 생략하고 요청과 응답을 전이중으로 처리하는데 집중한다. 요청의 스트림과 별도로 응답을 처리하기 위해서는 ServerHttpResponse로 부터 출력 스트림을 생성해야 한다.
@Controller
class List(val db:DatabaseClient) {
data class Request(val page: Int, val search: String)
var charset:Charset = StandardCharsets.UTF_8
@GetMapping("/list")
fun list(httpReq: ServerHttpRequest, httpRes: ServerHttpResponse): Mono<Void> {
//응답 스트림을 별도로 생성하고 Sink를 통해 처리함
var responseSink:Sinks.Many<ByteArray> = Sinks.many().unicast().onBackpressureBuffer()
httpRes.writeAndFlushWith(responseSink.asFlux().map{Mono.just(dataBufferFactory.wrap(it))}).subscribe()
//응답 1
responseSink.tryEmitNext("hello1".toByteArray(charset))
return httpReq.body.filter { it.readableByteCount() > 0 }.concatMap { dataBuffer ->
Mono.fromCallable {
val byteBody: ByteArray = ByteArray(dataBuffer.readableByteCount()).also { dataBuffer.read(it) }
DataBufferUtils.release(dataBuffer)
val stringBody = String(byte, charset)
println("request: $stringBody")
//청크 도착시 마다 응답
responseSink.tryEmitNext("request: $stringBody".toByteArray(charset))
}
}.then()
}
}
응답 스트림은 ServerHttpResponse
의 writeAntFlushWith
로 손쉽게 생성할 수 있다. 위 코드에서처럼 그 생성과 사용은 ServerHttpRequest
과 완전히 분리되어 있으며 요청이 오기 전, 오는 중, 다 온 뒤 등 아무 때나 응답을 보낼 수 있다.
또한 한 번이 아닌 여러 번 자유롭게 보낼 수 있다. 바로 이 부분이 http2
의 전이중 특성을 나타낸다. 이제 이를 감싸 메소드와 리스너를 제공하는 클래스를 생성하자.
지금까지 언급한 전이중 처리를 위임할 클래스를 작성한다.
class H2Controller private constructor(private val httpReq: ServerHttpRequest){
companion object{
var charset:Charset = StandardCharsets.UTF_8
operator fun invoke(httpReq: ServerHttpRequest, httpRes: ServerHttpResponse, block:H2Controller.()->Unit):Mono<Void>{
val controller = H2Controller(httpReq, httpRes)
block(controller)
httpRes.writeAndFlushWith(controller.responseSink.asFlux().map{Mono.just(dataBufferFactory.wrap(it))}).subscribe()
httpReq.body.filter { it.readableByteCount() > 0 }.concatMap { dataBuffer ->
Mono.fromCallable {
val byteBody: ByteArray = ByteArray(dataBuffer.readableByteCount()).also { dataBuffer.read(it) }
DataBufferUtils.release(dataBuffer)
controller.requestBlock?.invoke(String(byte, charset))
}
}.then()
}
}
private var responseSink:Sinks.Many<ByteArray> = Sinks.many().unicast().onBackpressureBuffer()
private var requestBlock:((String)->Unit)? = null
fun response(data:String){
responseSink.tryEmitNext(data.toByteArray(charset))
}
fun request(block:(String)->Unit){
requestBlock = block
}
}
매 요청시마다 요청, 응답 객체로부터 간단한 DSL을 통해 요청콜백과 응답을 처리하는 메소드를 활용하게 한다.
@Controller
class List(val db:DatabaseClient) {
data class Request(val page: Int, val search: String)
@GetMapping("/list")
fun list(httpReq: ServerHttpRequest, httpRes: ServerHttpResponse): Mono<Void> = H2Controller(httpReq, httpRes) {
response("hello1")
request{
response("request: $it")
}
}
}
이제 개별 컨트롤러의 전이중 처리가 매우 간단해 졌다. 하지만 여전히 suspend
와는 결합하지 못했다.
스프링 리액터는 코틀린과의 연동을 위해 특수한 mono
함수를 제공한다. 이 함수를 이용하면 suspend
로 된 내용을 손쉽게 Mono
로 환원하여 리액터와 연동이 손쉬워진다.
class H2Controller private constructor(private val httpReq: ServerHttpRequest){
companion object{
var charset:Charset = StandardCharsets.UTF_8
suspend operator fun invoke(httpReq: ServerHttpRequest, httpRes: ServerHttpResponse, block:suspend H2Controller.()->Unit):Mono<Void>{
val controller = H2Controller(httpReq, httpRes)
block(controller)
httpRes.writeAndFlushWith(controller.responseSink.asFlux().map{Mono.just(dataBufferFactory.wrap(it))}).subscribe()
httpReq.body.filter { it.readableByteCount() > 0 }.concatMap { dataBuffer ->
Mono.fromCallable {
val byteBody: ByteArray = ByteArray(dataBuffer.readableByteCount()).also { dataBuffer.read(it) }
DataBufferUtils.release(dataBuffer)
}
}.then(mono{
controller.requestBlock?.invoke(String(byte, charset))
}).then()
}
}
private var responseSink:Sinks.Many<ByteArray> = Sinks.many().unicast().onBackpressureBuffer()
private var requestBlock:(suspend (String)->Unit)? = null
fun response(data:String){
responseSink.tryEmitNext(data.toByteArray(charset))
}
fun request(block:suspend (String)->Unit){
requestBlock = block
}
}
개선된 H2Controller
는 전달되는 block
에서 요청 콜백까지 전부 suspend
를 지원한다.
@Controller
class List(val db:DatabaseClient) {
data class Request(val page: Int, val search: String)
@GetMapping("/list")
fun list(httpReq: ServerHttpRequest, httpRes: ServerHttpResponse): Mono<Void> = H2Controller(httpReq, httpRes) {
suspendedService()
response("hello1")
request{
suspendedService()
response("request: $it")
}
}
}
이제 컨트롤러는 매우 자연스럽게 suspend
를 전이중 처리에 녹여넣을 수 있게 되었다.
네띠는 이미 전이중에 기반한 처리를 내부에 지원하지만 스프링의 전형적인 컨트롤러 시그니처에서는 지원되지 않는다. 진정한 전이중 기반의 컨트롤러는 Mono<Void>
반환으로 구현할 수 있으며 코루틴의 suspend
와도 통합할 수 있다.