코루틴과 Flow
코틀린의 대표적인 동시성 철학이 코루틴이다. 코루틴은 워커쓰레드패턴(Worker Thread Pattern)에 기반을 둔 구조로 동기화를 이루는 작업큐에 원하는 작업을 등록하면 여러 워커역할을 수행하는 쓰레드에서 이 큐의 작업을 가져서 처리하는 방식을 말한다. 흔히 이벤트루프(event loop)로도 알려져있으며 동기화 문제를 효과적으로 해결하면서도 비동기적인 상항을 마치 동기적인 파이프라인처럼 처리할 수 있어 동시성의 복잡성을 크게 낮춘다. 코틀린은 언어 차원과 라이브러리 차원으로 코루틴을 지원하는데 언어 차원의 지원은 suspend함수의 파싱과 컨티뉴에이션(Continuation) 객체의 자동 생성이며 kotlinx.coroutine 라이브러리로는 스코프와 Flow 등을 지원한다. 이전에는 많은 동시성 구현을 스코프를 통해 처리했다. 하지만 현 시점에서는 대부분의 동시성 처리를 Flow로 할 수 있게 되면서 둘을 혼용하거나 아예 Flow만 쓰는 패턴으로 변해가고 있다. 이 장에서는 코루틴 스코프를 완전히 배제하고 기본 코루틴 기능과 Flow만으로 전개한다.
코틀린 컴파일러는 suspend함수를 일반함수로 변환한다.
suspend fun plus(a:Int, b:Int):Int = a + b
//변환결과
fun plus(a:Int, b:Int, cont:Continuation<*>):Unit = cont.resume(a + b)
class PlusContinuation(val parent:Continuation<*>):Continuation<Int>
스코프등에 포함된 코루틴 빌더는 사실 변환된 suspend함수를 호출하는 코드를 생성하는 것으로 주 내용은 적합한 컨티뉴에이션을 인자로 전달하는 것이다. 하지만 이는 직접 코드로 구현해도 무방하다.
private class Cont(override val context:CoroutineContext):Continuation<Unit> {
override fun resumeWith(result: Result<Unit>){}
}
fun launch(ctx:CoroutineContext = EmptyCoroutineContext, block:suspend ()->Unit) {
block.startCoroutine(Cont(ctx))
}
launch {
delay(1000)
println("Hello")
}
Thread.sleep(1100)
이 코드는 suspend
함수를 비 suspend
함수로 실행할 수 있게 해준다(보다 상세한 내용은 코틀린에 구현된 startCoroutine
의 상세 구현을 따라가보자)
하지만 launch
함수의 최소 구현은 언제 suspend
함수가 종료될지 예상할 수 없어 Thread.sleep
등으로 적당히 대기하는 식의 코드를 작성할 수 밖에 없다. 이를 보완하려면 await()
등의 메소드를 제공하여 대기할 수 있거나 콜백을 받아서 종료 시 호출될 수 있어야할 것이다.
Job
은 간단한 구조체로 그 자체로는 아무일도 안하지만 코루틴 스코프 라이브러리에서 상태를 관리하기 위한 객체로 사용된다. 코틀린 언어 자체에는 이런 기능을 하는 객체를 따로 정의하지 않고 있으므로 kotlinx.coroutine
의 Job
을 이용해 상태를 관리하자.
기존에 아무것도 안하는 컨티뉴에이션에 Job
으로 상태를 보고하는 기능을 추가하자.
private class JobCont(ctx:CoroutineContext):Continuation<Unit> {
val job = (ctx[Job] as? CompletableJob) ?: Job()
override val context = (ctx[Job] as? CompletableJob)?.let{ctx} ?: (ctx + job)
override fun resumeWith(result:Result<Unit>){
result.onSuccess { job.complete() }
.onFailure { job.completeExceptionally(it) }
}
}
fun launch(ctx:CoroutineContext = EmptyCoroutineContext, block:suspend ()->Unit):CompletableJob{
val cont = JobCont(ctx)
block.startCoroutine(cont)
return cont.job
}
launch {
delay(1000)
println("Hello")
}
Thread.sleep(1100)
우리가 이전과 다르게 취할 수 있는 정보는 JobCont
내의 job
속성이다. 하지만 CompletableJob
은 매우 로우레벨 객체로 이것으로부터 완료대기 등을 직접 사용할 때마다 구현하는 것은 무리다.
편의 기능을 구현하기 전에 현재 쓰레드를 폴링(Polling)하지 않고 wait set에 넣어서 대기할 방법이 필요하다. 이 방식으로 구현하지 않으면 쓰레드가 계속 혹사당한다.
JVM에서 이런 AQS(AbstractQueuedSynchronizer)를 이용한 가장 간단한 방법은 Latch
다.
class Latch actual constructor() {
private val latch = CountDownLatch(1)
fun await(timeout:Int):Boolean
= if(timeout == 0) {
latch.await()
true
}else latch.await(timeout.toLong(), TimeUnit.MILLISECONDS)
fun countDown() {
latch.countDown()
}
}
Latch
는 원하는 카운트에 이르면 풀려나는 구조며 타임아웃을 설정하는 경우는 시간 내에 카운트가 일어나면 true
아니면 false
로 종료된다. 즉 타임아웃을 건 Latch
는 매우 안전하게 쓰레드를 풀어준다.
또한 latch
대기는 쓰레드의 인터럽트에도 반응하여 중지할 수 있는 구조를 내장한다. 간단한 래퍼클래스를 이용해 CompletableJob
을 대기하는 await
메소드를 제공하자.
@JvmInline
value class LaunchHandle(val job:CompletableJob){
inline fun toFlow() = flow{
val e:Throwable? = suspendCoroutine {cont->
job.invokeOnCompletion {
cont.resume(it)
}
}
emit(e)
}
inline fun await(timeout:Int = 0):Throwable?{
val latch = Latch()
var err:Throwable? = null
job.invokeOnCompletion {
latch.countDown()
err = it
}
if(!latch.await(timeout)){
job.cancel(CancellationException("timeout"))
err = e
}
return err
}
inline fun await(noinline block:(Throwable?)->Unit){
block(await(0))
}
inline fun cancel(cause:String = ""):Throwable?{
if(!job.isCancelled && !job.isCompleted) job.cancel(CancellationException(cause))
return await()
}
inline fun cancel(cause:String = "", noinline block:(Throwable?)->Unit){
if(!job.isCancelled && !job.isCompleted) job.cancel(CancellationException(cause))
await(block)
}
}
2장에서 이미 배웠던 value class
를 이용하여 가볍게 래핑한다. 우선 Flow
화 할 때는 원래 반환 값이 없으므로 줄 수 있는 값은 오류 상황인지 뿐이다. 따라서 Throwable?
를 emit
하는 Flow
를 반환한다.
CompletableJob
은 complete()
호출 시 작동할 콜백을 invokeOnCompletion
으로 등록할 수 있다. 하지만 이는 비 suspend
컨텍스트다. 이를 Flow
내부의 suspend
의 흐름에 병합하려면 suspendCoroutine
을 경유해야 한다.
await
는 본격적으로 Latch
를 이용한다. job
의 complete()
시점에 카운트 다운을 실시하여 latch.await
의 대기를 풀어준다. 타임아웃인 경우는 cancel
처리한다.
이제 CompletableJob
을 감싸 await
를 손쉽게 처리할 수 있다.
private class JobCont(ctx:CoroutineContext):Continuation<Unit> {
val job = (ctx[Job] as? CompletableJob) ?: Job()
override val context = (ctx[Job] as? CompletableJob)?.let{ctx} ?: (ctx + job)
override fun resumeWith(result:Result<Unit>){
result.onSuccess { job.complete() }
.onFailure { job.completeExceptionally(it) }
}
}
fun launch(ctx:CoroutineContext = EmptyCoroutineContext, block:suspend ()->Unit):LaunchHandle{
val cont = JobCont(ctx)
block.startCoroutine(cont)
return LaunchHandle(cont.job)
}
launch {
delay(1000)
println("Hello")
}.await()
Flow
에서 여러 개의 비동기를 처리하는 핵심은 flatMap
이다. Flow
는 시간상 순차적으로 emit
되는 것으로 이해할 수 있다. 이 emit
된 값을 다시 flatMap
한다는 의미는 단일 값으로부터 Flow
를 만들어 collect
를 통해 비동기로 수집하겠다는 의미다.
List
의 flatMap
과는 완전히 다르며 원래 emit
의 순서가 보장되는 것도 아니다. 핵심은 각 emit
이 비동기로 다시 전개된다는 것이다.
val flow:Flow<String> = listOf(1, 2, 3, 4).asFlow().flatMapMerge(2){
flow{
delay(Random.nextInt(50, 201))
emit("$it")
}
}
launch{
print(flow.toList().joinToString(","))
}
이 결과는 실행할 때마다 변할 수 있다. 물론 toList
나 collect
는 모든 원소의 Flow
가 다 collect
되는 걸 보장한다. 가장 느린 원소의 Flow
처리가 최종 처리 시간이 될 것이다.
하지만 도착하는 순서대로 emit
하게 되므로 출력할 때마다 순서는 뒤죽박죽이 된다. 그래도 launch
는 큰 문제 없다. 원래 전체가 완료되는걸 대기하는 await
의 동작만 보장하지 그 외에 순서나 반환값을 기다리지 않기 때문이다.
이제 이 모든 걸을 합쳐 launchAll
을 만들 수 있다.
suspend inline fun <T:Any> Iterable<T>.launchAll(context:CoroutineContext = Dispatchers.Default, concurrency:Int = 16, noinline block:suspend (T)->Unit){
asFlow().flowOn(context).flatMapMerge(concurrency){
flow<Unit>{
block(it)
}
}.onCompletion(onComplete).collect()
}
launch{
listOf(1, 2, 3, 4).launchAll(2){
delay(Random.nextInt(50, 201))
println(it)
}
}.await()
이 구현은 List
의 각 원소를 일종의 시드(seed)로 이용해 병렬적인 비동기 실행을 만들어낸다. flatMapMerge
의 구현은 concurrency
인자에 맞춰 동시에 실행되는 쓰레드 수를 조정할 수 있다.
위 예제 실행은 1과 2 사이는 순서가 다를 수 있지만 1,2가 출력된 후 3,4가 출력된다. concurrency
를 2로 설정했기 때문에 2개의 작업이 끝나야 그 다음 2개의 작업을 실행하기 때문이다.
note
Flow의 다른 flatMap도 눈여겨 볼만하다. flatMapConcat은 사실상 concurrency에 1을 준 flatMapMerge와 비슷하다. 결국 대부분의 구현은 ChannelFlow 기반이다.
자바스크립트의 Promise.all
처럼 모든 처리 결과를 모아 하나의 List로 반환하는 awaitAll
을 만들어보자. 앞서 구현한 launchAll
과 결합하면 사실상 모든 동시성 시나리오리를 처리할 수 있다.
하지만 문제가 하나 있었다. flatMapMerge
가 원래 순서를 보장하지 않는다는 것이다. 이를 보정하는 방법은 최초의 인덱스를 보존하는 것이다. 코틀린은 List
의 원소를 인덱스와 묶은 원소로 바꿔서 새로운 List
를 만들어주는 withIndex()
메소드가 있다. 이를 활용하자.
val flow:Flow<String> = listOf(1, 2, 3, 4).withIndex().asFlow().flatMapMerge(2){(index, value)->
flow<Pair<Int, String>>{
delay(Random.nextInt(50, 201))
emit(index to "$value")
}
}
launch{
print(flow.toList().sortedBy{it.first}.map{it.second}.joinToString(","))
}
이 방법은 원래 인덱스를 같이 출력하므로 모두 collect
한 뒤 인덱스로 정렬하여 value
만 추출하는 방식을 사용한다.
suspend fun <T:Any, V:Any> List<T>.awaitAllIndexed(context:CoroutineContext = Dispatchers.Default, concurrency:Int = 16, noinline block:suspend (Int, T)->V):LF<V>{
val result:ArrayList<Pair<Int, V>> = arrayListOf()
withIndex().asFlow().flowOn(context).flatMapMerge(concurrency){
flow<Pair<Int, V>?>{block(it.index, it.value)?.let{v->emit(it.index to v)}}
}.collect{it?.let {v->result.add(v)}}
return result.sortedBy{it.first}.map{it.second}.toArrayList()
}
launch{
val result = listOf(url1, url2, url3, url4).awaitAllIndexed { index, url ->
flow{
WebClient.builder().baseUrl(url1).build().post()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(request[index])
.retrieve()
.awaitBody<String>()
}
}
println(result.joinToString("\n"))
}.await()
아주 손쉽게 여러 API질의를 동시에 처리하여 결과를 수집할 수 있다. 특히 concurrency를 조정하면 매우 손쉽게 동시처리 제한도 걸 수 있다.
코루틴은 매우 강력하지만 생각보다 많은 사전지식을 요구한다. 섬세하지 않게 쓰고 싶다면 충분한 추상화가 필요하다. Flow
는 결국 suspend
함수의 래퍼일 뿐이라 매우 가볍다.
기존 코틀린의 전략은 이러한 복잡성을 코루틴스코프에 감추고 스코프를 통해 동시성 제어를 하도록 권장했다. 하지만 스코프는 기본적으로 무겁고 여전히 복잡하며 실수하기 쉬운 구조로 되어있다.
이에 비해 위에 구현된 launch
, launchAll
, awaitAll
등을 이용하면 매우 손쉽게 통제하면서도 오버헤드는 최소화된다.