Skip to content

Instantly share code, notes, and snippets.

@hikaMaeng
Created August 26, 2025 09:07
Show Gist options
  • Save hikaMaeng/3bebbf2e218f108648d0817766fddccc to your computer and use it in GitHub Desktop.
Save hikaMaeng/3bebbf2e218f108648d0817766fddccc to your computer and use it in GitHub Desktop.

Chapter 3

코루틴과 Flow


3-1 배경

코틀린의 대표적인 동시성 철학이 코루틴이다. 코루틴은 워커쓰레드패턴(Worker Thread Pattern)에 기반을 둔 구조로 동기화를 이루는 작업큐에 원하는 작업을 등록하면 여러 워커역할을 수행하는 쓰레드에서 이 큐의 작업을 가져서 처리하는 방식을 말한다. 흔히 이벤트루프(event loop)로도 알려져있으며 동기화 문제를 효과적으로 해결하면서도 비동기적인 상항을 마치 동기적인 파이프라인처럼 처리할 수 있어 동시성의 복잡성을 크게 낮춘다. 코틀린은 언어 차원과 라이브러리 차원으로 코루틴을 지원하는데 언어 차원의 지원은 suspend함수의 파싱과 컨티뉴에이션(Continuation) 객체의 자동 생성이며 kotlinx.coroutine 라이브러리로는 스코프와 Flow 등을 지원한다. 이전에는 많은 동시성 구현을 스코프를 통해 처리했다. 하지만 현 시점에서는 대부분의 동시성 처리를 Flow로 할 수 있게 되면서 둘을 혼용하거나 아예 Flow만 쓰는 패턴으로 변해가고 있다. 이 장에서는 코루틴 스코프를 완전히 배제하고 기본 코루틴 기능과 Flow만으로 전개한다.

3-2 suspend함수

코틀린 컴파일러는 suspend함수를 일반함수로 변환한다.

suspend fun plus(a:Int, b:Int):Int = a + b

//변환결과
fun plus(a:Int, b:Int, cont:Continuation<*>):Unit = cont.resume(a + b)
class PlusContinuation(val parent:Continuation<*>):Continuation<Int>

스코프등에 포함된 코루틴 빌더는 사실 변환된 suspend함수를 호출하는 코드를 생성하는 것으로 주 내용은 적합한 컨티뉴에이션을 인자로 전달하는 것이다. 하지만 이는 직접 코드로 구현해도 무방하다.

private class Cont(override val context:CoroutineContext):Continuation<Unit> {
    override fun resumeWith(result: Result<Unit>){}
}

fun launch(ctx:CoroutineContext = EmptyCoroutineContext, block:suspend ()->Unit) {
    block.startCoroutine(Cont(ctx))
}

launch {
    delay(1000)
    println("Hello")
}

Thread.sleep(1100)

이 코드는 suspend 함수를 비 suspend함수로 실행할 수 있게 해준다(보다 상세한 내용은 코틀린에 구현된 startCoroutine의 상세 구현을 따라가보자)

하지만 launch함수의 최소 구현은 언제 suspend함수가 종료될지 예상할 수 없어 Thread.sleep등으로 적당히 대기하는 식의 코드를 작성할 수 밖에 없다. 이를 보완하려면 await() 등의 메소드를 제공하여 대기할 수 있거나 콜백을 받아서 종료 시 호출될 수 있어야할 것이다.

3-3 launch 구현과 Job

Job은 간단한 구조체로 그 자체로는 아무일도 안하지만 코루틴 스코프 라이브러리에서 상태를 관리하기 위한 객체로 사용된다. 코틀린 언어 자체에는 이런 기능을 하는 객체를 따로 정의하지 않고 있으므로 kotlinx.coroutineJob을 이용해 상태를 관리하자. 기존에 아무것도 안하는 컨티뉴에이션에 Job으로 상태를 보고하는 기능을 추가하자.

private class JobCont(ctx:CoroutineContext):Continuation<Unit> {
    val job = (ctx[Job] as? CompletableJob) ?: Job()
    override val context = (ctx[Job] as? CompletableJob)?.let{ctx} ?: (ctx + job)
    override fun resumeWith(result:Result<Unit>){
        result.onSuccess { job.complete() }
            .onFailure { job.completeExceptionally(it) }
    }
}
fun launch(ctx:CoroutineContext = EmptyCoroutineContext, block:suspend ()->Unit):CompletableJob{
    val cont = JobCont(ctx)
    block.startCoroutine(cont)
    return cont.job
}

launch {
    delay(1000)
    println("Hello")
}

Thread.sleep(1100)

우리가 이전과 다르게 취할 수 있는 정보는 JobCont내의 job속성이다. 하지만 CompletableJob은 매우 로우레벨 객체로 이것으로부터 완료대기 등을 직접 사용할 때마다 구현하는 것은 무리다. 편의 기능을 구현하기 전에 현재 쓰레드를 폴링(Polling)하지 않고 wait set에 넣어서 대기할 방법이 필요하다. 이 방식으로 구현하지 않으면 쓰레드가 계속 혹사당한다. JVM에서 이런 AQS(AbstractQueuedSynchronizer)를 이용한 가장 간단한 방법은 Latch다.

class Latch actual constructor() {
    
    private val latch = CountDownLatch(1)
    
    fun await(timeout:Int):Boolean
    = if(timeout == 0) {
        latch.await()
        true
    }else latch.await(timeout.toLong(), TimeUnit.MILLISECONDS)
    
    fun countDown() {
        latch.countDown()
    }
}

Latch는 원하는 카운트에 이르면 풀려나는 구조며 타임아웃을 설정하는 경우는 시간 내에 카운트가 일어나면 true아니면 false로 종료된다. 즉 타임아웃을 건 Latch는 매우 안전하게 쓰레드를 풀어준다. 또한 latch 대기는 쓰레드의 인터럽트에도 반응하여 중지할 수 있는 구조를 내장한다. 간단한 래퍼클래스를 이용해 CompletableJob을 대기하는 await메소드를 제공하자.

@JvmInline
value class LaunchHandle(val job:CompletableJob){
    inline fun toFlow() = flow{
        val e:Throwable? = suspendCoroutine {cont->
            job.invokeOnCompletion {
                cont.resume(it)
            }
        }
        emit(e)
    }
    inline fun await(timeout:Int = 0):Throwable?{
        val latch = Latch()
        var err:Throwable? = null
        job.invokeOnCompletion {
            latch.countDown()
            err = it
        }
        if(!latch.await(timeout)){
            job.cancel(CancellationException("timeout"))
            err = e
        }
        return err
    }
    inline fun await(noinline block:(Throwable?)->Unit){
        block(await(0))
    }
    inline fun cancel(cause:String = ""):Throwable?{
        if(!job.isCancelled && !job.isCompleted) job.cancel(CancellationException(cause))
        return await()
    }
    inline fun cancel(cause:String = "", noinline block:(Throwable?)->Unit){
        if(!job.isCancelled && !job.isCompleted) job.cancel(CancellationException(cause))
        await(block)
    }
}

2장에서 이미 배웠던 value class를 이용하여 가볍게 래핑한다. 우선 Flow화 할 때는 원래 반환 값이 없으므로 줄 수 있는 값은 오류 상황인지 뿐이다. 따라서 Throwable?emit하는 Flow를 반환한다. CompletableJobcomplete()호출 시 작동할 콜백을 invokeOnCompletion으로 등록할 수 있다. 하지만 이는 비 suspend컨텍스트다. 이를 Flow내부의 suspend의 흐름에 병합하려면 suspendCoroutine을 경유해야 한다.

await는 본격적으로 Latch를 이용한다. jobcomplete() 시점에 카운트 다운을 실시하여 latch.await의 대기를 풀어준다. 타임아웃인 경우는 cancel처리한다. 이제 CompletableJob을 감싸 await를 손쉽게 처리할 수 있다.

private class JobCont(ctx:CoroutineContext):Continuation<Unit> {
    val job = (ctx[Job] as? CompletableJob) ?: Job()
    override val context = (ctx[Job] as? CompletableJob)?.let{ctx} ?: (ctx + job)
    override fun resumeWith(result:Result<Unit>){
        result.onSuccess { job.complete() }
            .onFailure { job.completeExceptionally(it) }
    }
}
fun launch(ctx:CoroutineContext = EmptyCoroutineContext, block:suspend ()->Unit):LaunchHandle{
    val cont = JobCont(ctx)
    block.startCoroutine(cont)
    return LaunchHandle(cont.job)
}

launch {
    delay(1000)
    println("Hello")
}.await()

3-4 Flow와 flatMap

Flow에서 여러 개의 비동기를 처리하는 핵심은 flatMap이다. Flow는 시간상 순차적으로 emit되는 것으로 이해할 수 있다. 이 emit된 값을 다시 flatMap한다는 의미는 단일 값으로부터 Flow를 만들어 collect를 통해 비동기로 수집하겠다는 의미다. ListflatMap과는 완전히 다르며 원래 emit의 순서가 보장되는 것도 아니다. 핵심은 각 emit이 비동기로 다시 전개된다는 것이다.

val flow:Flow<String> = listOf(1, 2, 3, 4).asFlow().flatMapMerge(2){
    flow{
        delay(Random.nextInt(50, 201))
        emit("$it")
    }
}
launch{
    print(flow.toList().joinToString(","))
}

이 결과는 실행할 때마다 변할 수 있다. 물론 toListcollect는 모든 원소의 Flow가 다 collect되는 걸 보장한다. 가장 느린 원소의 Flow처리가 최종 처리 시간이 될 것이다. 하지만 도착하는 순서대로 emit하게 되므로 출력할 때마다 순서는 뒤죽박죽이 된다. 그래도 launch는 큰 문제 없다. 원래 전체가 완료되는걸 대기하는 await의 동작만 보장하지 그 외에 순서나 반환값을 기다리지 않기 때문이다. 이제 이 모든 걸을 합쳐 launchAll을 만들 수 있다.

suspend inline fun <T:Any> Iterable<T>.launchAll(context:CoroutineContext = Dispatchers.Default, concurrency:Int = 16, noinline block:suspend (T)->Unit){
    asFlow().flowOn(context).flatMapMerge(concurrency){
        flow<Unit>{
            block(it)
        }
    }.onCompletion(onComplete).collect()
}

launch{
    listOf(1, 2, 3, 4).launchAll(2){
        delay(Random.nextInt(50, 201))
        println(it)
    }
}.await()

이 구현은 List의 각 원소를 일종의 시드(seed)로 이용해 병렬적인 비동기 실행을 만들어낸다. flatMapMerge의 구현은 concurrency인자에 맞춰 동시에 실행되는 쓰레드 수를 조정할 수 있다. 위 예제 실행은 1과 2 사이는 순서가 다를 수 있지만 1,2가 출력된 후 3,4가 출력된다. concurrency를 2로 설정했기 때문에 2개의 작업이 끝나야 그 다음 2개의 작업을 실행하기 때문이다.

note

Flow의 다른 flatMap도 눈여겨 볼만하다. flatMapConcat은 사실상 concurrency에 1을 준 flatMapMerge와 비슷하다. 결국 대부분의 구현은 ChannelFlow 기반이다.

3-5 awaitAll

자바스크립트의 Promise.all처럼 모든 처리 결과를 모아 하나의 List로 반환하는 awaitAll을 만들어보자. 앞서 구현한 launchAll과 결합하면 사실상 모든 동시성 시나리오리를 처리할 수 있다. 하지만 문제가 하나 있었다. flatMapMerge가 원래 순서를 보장하지 않는다는 것이다. 이를 보정하는 방법은 최초의 인덱스를 보존하는 것이다. 코틀린은 List의 원소를 인덱스와 묶은 원소로 바꿔서 새로운 List를 만들어주는 withIndex()메소드가 있다. 이를 활용하자.

val flow:Flow<String> = listOf(1, 2, 3, 4).withIndex().asFlow().flatMapMerge(2){(index, value)->
    flow<Pair<Int, String>>{
        delay(Random.nextInt(50, 201))
        emit(index to "$value")
    }
}
launch{
    print(flow.toList().sortedBy{it.first}.map{it.second}.joinToString(","))
}

이 방법은 원래 인덱스를 같이 출력하므로 모두 collect한 뒤 인덱스로 정렬하여 value만 추출하는 방식을 사용한다.

suspend fun <T:Any, V:Any> List<T>.awaitAllIndexed(context:CoroutineContext = Dispatchers.Default, concurrency:Int = 16, noinline block:suspend (Int, T)->V):LF<V>{
    val result:ArrayList<Pair<Int, V>> = arrayListOf()
    withIndex().asFlow().flowOn(context).flatMapMerge(concurrency){
        flow<Pair<Int, V>?>{block(it.index, it.value)?.let{v->emit(it.index to v)}}
    }.collect{it?.let {v->result.add(v)}}
    return result.sortedBy{it.first}.map{it.second}.toArrayList()
}

launch{
    val result = listOf(url1, url2, url3, url4).awaitAllIndexed { index, url ->
        flow{
            WebClient.builder().baseUrl(url1).build().post()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(request[index])
                .retrieve()
                .awaitBody<String>()
        }
    }
    println(result.joinToString("\n"))
}.await()

아주 손쉽게 여러 API질의를 동시에 처리하여 결과를 수집할 수 있다. 특히 concurrency를 조정하면 매우 손쉽게 동시처리 제한도 걸 수 있다.

3-6 결론

코루틴은 매우 강력하지만 생각보다 많은 사전지식을 요구한다. 섬세하지 않게 쓰고 싶다면 충분한 추상화가 필요하다. Flow는 결국 suspend함수의 래퍼일 뿐이라 매우 가볍다. 기존 코틀린의 전략은 이러한 복잡성을 코루틴스코프에 감추고 스코프를 통해 동시성 제어를 하도록 권장했다. 하지만 스코프는 기본적으로 무겁고 여전히 복잡하며 실수하기 쉬운 구조로 되어있다. 이에 비해 위에 구현된 launch, launchAll, awaitAll 등을 이용하면 매우 손쉽게 통제하면서도 오버헤드는 최소화된다.

연습문제

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment