Jwhyee / kotlin-coroutine-study

코틀린 코루틴 북 스터디 📚
4 stars 1 forks source link

2부 코틀린 코루틴 라이브러리 - 2 #3

Open lee-ji-hoon opened 5 months ago

lee-ji-hoon commented 5 months ago

3월 31일 11, 12, 13장

Jaeeun1083 commented 5 months ago

지난 정리에서 launch()의 내부 동작을 확인하며 coroutine.start(start, coroutine, block)가 뭘 하는 건지 해결이 안된 채로 넘어갔었다.

간단하지만 그 의문에 대해 해결해보자.

CoroutineScope의 extension function - launch

// Builders.common.kt
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

// jvmMain/CoroutineContext.kt
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
  val combined = coroutineContext + context
  val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
  return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
    debug + Dispatchers.Default else debug
}

// Builders.common.kt
private open class StandaloneCoroutine(
  parentContext: CoroutineContext,
  active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
  override fun handleJobException(exception: Throwable): Boolean {
    handleCoroutineException(context, exception)
    return true
  }
}

// Builders.common.kt
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
  start(block, receiver, this)
}

coroutine.start(...) 이 부분만 보면 무한 반복 함수처럼 보일 수 있다.

그러나 CoroutineStart를 살펴보면, invoke 연산자 오버로딩이 되어 있어서 이 부분이 호출된다.

// CoroutineStart.kt
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
    when (this) {
        DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        ATOMIC -> block.startCoroutine(receiver, completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        LAZY -> Unit // will start lazily
    }

이로 인해 해당 블록을 수신자(receiver)로 하여 이 코루틴 시작 전략(coroutine start strategy)으로 코루틴을 시작하게 되는 것이다.

[ invoke 함수란? ]

https://kotlinlang.org/docs/lambdas.html#function-types

코루틴 시작 전략 중 DEFAULT에 해당하는 startCoroutineCancellable만 살펴보도록 하자.

// Cancellable.kt
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
    createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

// IntrinsicsJVM.kt
@SinceKotlin("1.3")
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
  completion: Continuation<T>
): Continuation<Unit> {
  val probeCompletion = probeCoroutineCreated(completion)
  return if (this is BaseContinuationImpl)
    create(probeCompletion)
  else
    createCoroutineFromSuspendFunction(probeCompletion) {
      (this as Function1<Continuation<T>, Any?>).invoke(it)
    }
}
// BaseContinuationImpl.kt
public open fun create(completion: Continuation<*>): Continuation<Unit> {
  throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
// IntrinsicsJVM.kt
@SinceKotlin("1.3")
private inline fun <T> createCoroutineFromSuspendFunction(
    completion: Continuation<T>,
    crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
    val context = completion.context
    // label == 0 when coroutine is not started yet (initially) or label == 1 when it was
    return if (context === EmptyCoroutineContext)
        object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
            private var label = 0

            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
                        block(this) // run the block, may return or suspend
                    }
                    1 -> {
                        label = 2
                        result.getOrThrow() // this is the result if the block had suspended
                    }
                    else -> error("This coroutine had already completed")
                }
        }
    else
        object : ContinuationImpl(completion as Continuation<Any?>, context) {
            private var label = 0

            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
                        block(this) // run the block, may return or suspend
                    }
                    1 -> {
                        label = 2
                        result.getOrThrow() // this is the result if the block had suspended
                    }
                    else -> error("This coroutine had already completed")
                }
        }
}

여기서 context의 EmptyCoroutineContext 여부에 따라 만드는 ContinuationImpl 객체가 달라지는데 launch()를 호출하면

맨 위에 나와있듯이 val newContext = newCoroutineContext(context)를 호출하여 Dispatcher.Default를 주입하므로 ContinuationImpl이 구현된다

다시 createCoroutineUnintercepted() 호출한 곳으로 돌아가서 나머지 부분을 확인해보자.

// Cancellable.kt
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
    createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>) = when (this) {
  is DispatchedContinuation -> resumeCancellableWith(result)
  else -> resumeWith(result)
}

intercepted()는 launch에 디스패처를 설정하였다면 동작하게 된다. 이후resumeWith를 호출하며 launch의 내부 동작이 마무리된다.

정리

launch()의 동작 과정을 다시 정리하자면

  1. launch() 호출 시 파라미터로 받은 context 객체를 newCoroutineContext() 호출을 통해 주어진 context와 현재 CoroutineScope의 context를 결합하여 새로운 CoroutineContext를 만든다.
  2. 생성한 context 를 사용하여 Coroutine 객체를 만든다.
  3. launch() 호출 시 파라미터로 받은 CoroutineStart 값에 따라 코루틴 시작 전략을 정하여 코루틴을 시작한다.
lee-ji-hoon commented 5 months ago

Main.immediate

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)

    CoroutineScope(Dispatchers.Main).launch {
        Log.d(TAG, "first main Coroutine ${Thread.currentThread().name}")
    }

    CoroutineScope(Dispatchers.Main.immediate).launch {
        Log.d(TAG, "second main immediate Coroutine ${Thread.currentThread().name}")
    }

    CoroutineScope(Dispatchers.IO).launch {
        CoroutineScope(Dispatchers.Main.immediate).launch {
            Log.d(TAG, "third main immediate Coroutine ${Thread.currentThread().name}")
        }
        Log.d(TAG, "third IO Coroutine ${Thread.currentThread().name}")
    }
}

위 코드가 있다고 할 때 예상되는 출력은 아래와 같다.

  1. first main
  2. second main immediate
  3. third main immediate
  4. third IO

이렇게 예상하는 이유는 스코프 내에서 launch 블럭안에서 추가적은 suspend point가 있는 것이 아니기에 순차적으로 실행이 될 것이라고 예상하기 때문이다.

image

실제 실행 후 출력 결과

  1. first main > second main immediate
  2. second main immediate > third IO
  3. third main immediate > first main
  4. third IO > third main immediate

이렇게 출력이 되는 이유는 아래와 같다.

Main.Immediate 의 주석 정리

즉 launch 되기 전에 Main이나 IO 같은 경우 리디스패치가 이뤄지기 때문에 block이 타이밍상 Main.Immediate 보다 느리게 동작할 수 있다는 의미이기에 이러한 출력 결과를 볼 수 있다.

Coroutine Dispatcher


말 그대로 스레드에서 코루틴을 보낸다 라는 의미가 담겨져있다. 즉 코루틴을 dispachter에 전송하면 자신이 관리하는 스레드풀 내의 스레드의 부하 상태에 맞춰 코루틴을 배분한다!

test

코루틴이 쓰레드풀에 분배되는 과정!

코루틴에서 쓰레드풀 만들기

val dispatcher = newFixedThreadPoolContext(3, "ThreadPool")

하지만 아까 말한거처럼 Dispacher를 통해 제어할 수 있다.

자주 사용하는 Dispacher 종류

이미 Dispacher가 생성돼 있어 별도로 생성하거나 정의할 필요가 없다.

Dispatchers.Main 

Dispatchers.IO 

Dispatchers.Default 

CoroutineScope(Dispatchers.Default).launch {
    byteArrayToBitmap(byteArray)
}
Jwhyee commented 5 months ago

coroutineScope

public suspend fun <R> coroutineScope(
    block: suspend CoroutineScope.() -> R
): R {  
    contract {  
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)  
    }  
    return suspendCoroutineUninterceptedOrReturn { uCont ->  
        val coroutine = ScopeCoroutine(uCont.context, uCont)  
        coroutine.startUndispatchedOrReturn(coroutine, block)  
    }  
}

withContext

public suspend fun <T> withContext(  
    context: CoroutineContext,  
    block: suspend CoroutineScope.() -> T  
): T {  
    contract {  
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)  
    }  
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->  
        val oldContext = uCont.context  
        val newContext = oldContext + context  
        newContext.ensureActive()  
        if (newContext === oldContext) {  
        val coroutine = ScopeCoroutine(newContext, uCont)  
        return@sc coroutine.startUndispatchedOrReturn(coroutine, block)  
        }  
        if (newContext[ContinuationInterceptor] ==
            oldContext[ContinuationInterceptor]) {  
            val coroutine = UndispatchedCoroutine(newContext, uCont)  
            withCoroutineContext(newContext, null) {  
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)  
            }  
        }  
        val coroutine = DispatchedCoroutine(newContext, uCont)  
        block.startCoroutineCancellable(coroutine, coroutine)  
        coroutine.getResult()  
    }  
}

supervisorScope

public suspend fun <R> supervisorScope(
    block: suspend CoroutineScope.() -> R
): R {  
    contract {  
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)  
    }  
    return suspendCoroutineUninterceptedOrReturn { uCont ->
        // SupervisorCoroutine
        val coroutine = SupervisorCoroutine(uCont.context, uCont)  
        coroutine.startUndispatchedOrReturn(coroutine, block)  
    }  
}

왜 withContext(SupervisorJob())은 에러를 못잡을까?

withContext 함수를 보면 인자로 context를 받은 뒤, 현재 context 와 인자로 받은 context를 더하는 코드를 볼 수 있다.

public suspend fun <T> withContext(  
    context: CoroutineContext,  
    block: suspend CoroutineScope.() -> T  
): T {  
    contract {  
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)  
    }  
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->  
        val oldContext = uCont.context  
        val newContext = oldContext + context
        // ...
        if (newContext[ContinuationInterceptor] ==
                oldContext[ContinuationInterceptor]) {  
            // UndispatchedCoroutine
            val coroutine = UndispatchedCoroutine(newContext, uCont)  
            withCoroutineContext(newContext, null) {  
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)  
        }  
    }
    // ..
    }  
}

그렇다면 아래 코드에서 oldContextCoroutineName("Parent")가 될 것이고, newContext는 기존 컨텍스트와 SupervisorJob을 합친 것이 될 것이다.

fun main() = runBlocking(CoroutineName("Parent")) {  
    log("Before")  
    withContext(SupervisorJob()) {  
        log("IN")  
        launch {  
            delay(1000)  
            throw Error()  
        }  
    }  
    log("After")  
}

newContextCoroutineName(Parent), SupervisorJobImpl{Active}@56143a9a라는 값을 가지게 되는데 왜 에러가 터지면 잡지 못하는걸까?

supervisorScope, withContext 두 함수 모두 suspendCoroutineUninterceptedOrReturn를 반환한다. 하지만 그 내부를 보면, 사용하는 코루틴이 다른 것을 알 수 있다. supervisorScope의 경우 SupervisorCoroutine을 사용하지만, withContext의 경우 UndispatchedCoroutine을 사용하는 것을 볼 수 있다.

SupervisorCoroutinechildCancelled 함수를 override 하지만, UndispatchedCoroutine은 해당 함수를 override하지 않는 것을 알 수 있다.

책에서 나온 것과 같이 결국 withContext(SupervisorJob())에서의 SupervisorJob()은 의미가 없으며, 사실상 쓸모가 없게 되는 것이다.

startUndispatchedOrReturn

그렇다면 전 주에 나왔던 startUndispatchedOrReturn는 어디서 사용하는 것일까? 각 스코프 함수를 보면 결론적으로 모두 coroutine.startUndispatchedOrReturn를 사용하는 것을 볼 수 있다.

internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturn(
    receiver: R, 
    block: suspend R.() -> T
): Any? {  
    return undispatchedResult({ true }) {  
        block.startCoroutineUninterceptedOrReturn(receiver, this)  
    }  
}

함수 이름 그대로 디스패치되지 않은 상태에서 실행할 수 있는지를 결정하며, 가능한 경우에는 람다를 실행해 결과 값을 반환하기 위한 것이다. 만약 디스패치 되지 않은 상태에서 실행할 수 없다면 null이 반환될 수도 있다.

private inline fun <T> ScopeCoroutine<T>.undispatchedResult(  
    shouldThrow: (Throwable) -> Boolean,  
    startBlock: () -> Any?  
): Any? {  
    val result = try {  
        startBlock()  
    } catch (e: Throwable) {  
        CompletedExceptionally(e)  
    }  

    // 중단된 상태일 경우 중단 상태 반환
    if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED // (1)  
    // 현재 코루틴의 상태를 가져옴
    val state = makeCompletingOnce(result)  
    // 자식 코루틴의 완료를 기다리고 있을 경우 중단 상태 반환
    if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED // (2) 
    // 정말 완료된 상태인지, 예외로 완료된 상태인지 확인
    return if (state is CompletedExceptionally) { // (3)  
        // 예외가 터진 상태일 경우 recoverStackTrace를 던짐
        when {  
            shouldThrow(state.cause) -> 
                throw recoverStackTrace(state.cause, uCont)  
            result is CompletedExceptionally -> 
                throw recoverStackTrace(result.cause, uCont)  
            else -> result  
        }  
    } else {
        // 예외가 발생하지 않았을 경우 state를 언박싱
        // 해당 함수 이후의 과정은 이해하지 못함.
        state.unboxState()  
    }  
}

internal fun Any?.unboxState(): Any? = 
    (this as? IncompleteStateBox)?.state ?: this

withTimeout

lee-ji-hoon commented 5 months ago

다음주까지

lee-ji-hoon commented 4 months ago

왜 IO는 64개이고 Default는 8개 일까?

우선 IO부터 보면 Default는 자연스럽게 추측이 가능해진다.

아래는 공식문서의 답변이다.
The CoroutineDispatcher that is designed for offloading blocking IO tasks to a shared pool of threads.
Additional threads in this pool are created and are shutdown on demand. The number of threads used by tasks in this dispatcher is limited by the value of “kotlinx.coroutines.io.parallelism” ( IO_PARALLELISM_PROPERTY_NAME) system property. It defaults to the limit of 64 threads or the number of cores (whichever is larger).
Moreover, the maximum configurable number of threads is capped by the kotlinx.coroutines.scheduler.max.pool.size system property. If you need a higher number of parallel threads, you should use a custom dispatcher backed by your own thread pool.

요약을 해보면 64개의 Thread 혹은 Core의 개수 중 큰 값에 의해 제한이 된다.

단순이 이렇게만 적혀져 있으니 이해가 쉽지 않으니 실제로 선언이 되어 있는 부분을 보자.

Dispatchers.IO

public val IO: CoroutineDispatcher = DefaultScheduler.IO

DefaultScheduler.IO

val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))

위처럼 64를 기준으로 coerceAtLeast를 사용해서 PROCESSORS의 개수와 비교하여 더 큰 값을 사용한다.

그렇기에 최소 64개 라는 것이 확실히 알 수 있다.

그리고 실제로 blocking이 되고 dispatch가 되는 부분 또한 함수에서 찾을 수 있었다.

blocking

public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
    require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
    return LimitingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
}

LimitingDispatcher::dispatch

override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)

private fun dispatch(block: Runnable, fair: Boolean) {
    // (snip)
    // note: parallelism here is the systemProp(...) passed to blocking(...) earlier, and is usually 64
    if (inFlight <= parallelism) {
        dispatcher.dispatchWithContext(taskToSchedule, this, fair)
        return
    }

    // Parallelism limit is reached, add task to the queue
    queue.add(taskToSchedule)
    // (snip)

}
그럼 왜 64개를 기본값으로 사용하는 것일까? 더 늘릴 수 있으니 더 늘려서 사용하는게 좋지 않나?

라는 질문에 도착을 하게 된다. 그럼 하나 가정해보자.

스레드 풀이 무제한으로 확장되는 경우, 시스템이 처리할 수 있는 동시 요청보다 더 많은 동시 요청을 생성하려고 시도함으로써 시스템 리소스를 고갈시키게 된다.

예시) 슬로우 로리스(Slow Loris)

각 연결마다 새 스레드를 생성하는 웹 서버와 천 개 이상의 클라이언트가 최대한 느리게 요청을 보내는 상황(예:1바이트/10초)

-> 서버에 최대 스레드 수가 있다면, 나중에 요청은 그냥 삭제될 것이지만 하지만 최소한 다른 서비스를 다운시키지는 않을 것이다.

클라이언트와 서버

사실 클라이언트에게는 이러한 공격이 실제로 문제가 되지 않는다. 이것의 예시로 okhttp기본적으로 무제한 풀을 사용하고 있다. 그러나 kotlin-coroutine은 클라이언트 뿐만 아니라 서버에서도 적용되는 라이브러리이기에 64라는 합리적인 기본값을 채택한 것으로 보인다.

마지막으로 Default가 8개인 이유는 IO와 반대로 core의 수를 기본으로 따라가며 Default를 사용하는 경우는 CPU 집약적인 작업 즉 한 번 작업을 하면 오랫동안 연산을 하는 작업이다보니 Thread 수까지는 필요가 없고 CPU Core 수를 따라가게끔 만든 것으로 추측이 된다.

Dispatcher.Default 사용을 조심해야 한다.

IO(64) / Default(8)개

만약 Default를 계속 사용중인데 Blocking이 되는 작업이 8개가 넘어간다면 계속해서 지연이 될 수 밖에 없다.

그렇기에 상당 기간 Thread를 차단할 수 있는 작업에서는 꼭 지양을 해야 한다.

말로만 하니 어려우니 Coroutine에서 Default가 어떻게 선언이 되어 있는지와 왜 Default 사용을 조심해야하는지도 보자.

Runtime.getRuntime().availableProcessors()

Runtime.getRuntime().availableProcessors() // 실제 Default의 Thread 수

image

image

위 처럼 선언이 되어있다.

코드 예시

val defaultDispatcherThreadPollSize: Int
    get() = Runtime.getRuntime().availableProcessors()

suspend fun main(): Unit = coroutineScope {
    println(defaultDispatcherThreadPollSize)
    launch {
        printCoroutinesTime(Dispatchers.Default)
    }
}

private suspend fun printCoroutinesTime(
    dispatcher: CoroutineDispatcher
) {
    val test = measureTimeMillis {
        coroutineScope {
            // default에서 사용 가능한 Thread보다 1개 많게 작업
            repeat(defaultDispatcherThreadPollSize + 1) {
                launch(dispatcher) {
                    Thread.sleep(1000)
                }
            }
        }
    }
    println("#1 $dispatcher took: $test")
}

기존에는 Default를 사용중이기 때문에 약 1000ms가 걸릴 것이라고 예측했지만 아래와 같은 결과가 나왔다.

image

위와 같은 결과가 나온 이유는 Default의 제한된 Thread Pool - Core 수 보다 더 많은 작업을 하고 있기 때문이다.

image

이렇게 되는 것이다.

그렇기에 Default를 사용할때는 진짜로 이 작업이 CPU 집약적인 작업이고 Blocking이 오래 되지는 않는 것인지 확신이 있어야 한다.