Jwhyee / kotlin-coroutine-study

코틀린 코루틴 북 스터디 📚
4 stars 1 forks source link

2부 코틀린 코루틴 라이브러리 - 1 #2

Open lee-ji-hoon opened 7 months ago

lee-ji-hoon commented 7 months ago
Jwhyee commented 7 months ago

6. 코루틴 빌더

launch 빌더

현업에서는 GlobalScope를 지양해야 한다고 한다. 우선 launch가 작동하는 방식은 thread 함수를 호출해 새로운 스레드를 시작하는 것과 비슷하다.

fun main() {  
    GlobalScope.launch {  
        delay(1000L)  
        println("Text")  
    }  
}

위처럼 GlobalScope를 사용할 경우 IDE에서 다음과 같은 경고를 띄워준다.

This is a delicate API and its use requires care. Make sure you fully read and understand documentation of the declaration that is marked as a delicate API.

Kotlin Coroutines 1.5.0 버전 이후로 @DelicateCoroutinesApi 어노테이션이 붙었는데, GlobalScope는 이름과 같이 전역에서 사용되고, 잡(job)에 바인딩되지 않는다. 만약 다음과 같이 OpenApi로 부터 데이터를 받아오는 코드가 있다고 가정하자.

fun main() {  
    GlobalScope.launch {  
        getDataFromOpenApi()  
    }  
}

만약 서버의 성능이 좋지 않아 네트워크가 느릴 경우 백그라운드에서 계속 대기하면서 리소스를 소비하게 된다. 즉, 중단되거나 지연될 경우에도 작동을 멈추지 않는다는 것이다.

또한 launch 내부에 있는 코드를 실행하기 위해서는 현재 스레드가 블로킹 되어야 하는데, Thread.sleep과 같은 함수를 사용하지 않으면 코루틴이 일할 기회조차 주지 않게 되는 것이다.

7. 코루틴 컨텍스트

중단 함수에서 컨텍스트에 접근하기

아래 코드를 실행하면 다음과 같은 결과가 나온다.

suspend fun main() = withContext(CoroutineName("Outer")){  
    printName()  
    launch(CoroutineName("Inner")) {  
        printName()  
    }  
    delay(10)  
    printName()  
}
Outer
Inner
Outer

그렇다면 만약 delay(10)을 주석처리하면 어떨까?

Outer
Outer
Inner

delay가 없을 경우 Outer의 자식 printName()이 먼저 실행되고, 이후 처리하지 않은 코루틴인 launch를 실행하게 된다. 하지만 delay가 있을 경우 현재 스레드가 블로킹 되었기 때문에 자식 코루틴을 실행한 뒤, 다시 메인으로 돌아와 남은 작업을 처리하게 된다.

8. 잡과 자식 코루틴 기다리기

자식은 어떻게 부모 컨텍스트를 상속 받을까?

fun main(): Unit = runBlocking(CoroutineName("main")) {  
    val name = coroutineContext[CoroutineName]?.name  
    println(name)  
    launch {  
        delay(1000)  
        val childContextName = coroutineContext[CoroutineName]?.name  
        println(childContextName)  
    }  
}

위 코드를 디컴파일하면 다음과 같은 바이트 코드가 나온다.

...
BuildersKt.runBlocking(
    (CoroutineContext)(new CoroutineName("main")), 
    (Function2)(new Function2((Continuation)null) 
{
    // launch 블럭
    BuildersKt.launch$default(
    $this$runBlocking, 
    (CoroutineContext)null, 
    (CoroutineStart)null, 
    (Function2)(new Function2((Continuation)null) {
        CoroutineName var10000 = 
        (CoroutineName)$this$launch.getCoroutineContext()
            .get((CoroutineContext.Key)CoroutineName.Key);  
        String childContextName = var10000 != null ? var10000.getName() : null;  
        System.out.println(childContextName);  
        return Unit.INSTANCE;  
    }
}

위처럼 launch$this$runBlocking을 인자로 넣어줘, 부모 컨텍스트를 주입하는 것을 볼 수 있다. 때문에 var10000에서 $this$launch.getCoroutineContext()를 했을 때, 부모 컨텍스트를 그대로 가져와 이름을 출력할 수 있는 것이다.

만약 부모 컨텍스트가 아닌 다른 컨텍스트를 사용할 경우 다음과 같이 launch에 자식 전용 CoroutineName을 지정해 새로운 컨텍스트를 사용하게 된다.

BuildersKt.runBlocking(
    (CoroutineContext)(new CoroutineName("main")), 
    (Function2)(new Function2((Continuation)null) 
{
    BuildersKt.launch$default(
        $this$runBlocking, 
        (CoroutineContext)(new CoroutineName("child")), 
        (CoroutineStart)null, 
        (Function2)(new Function2((Continuation)null) 
    {
        // ...
    }
}

9. 취소

경쟁 상태(race condition)

경쟁 상태는 말 그대로 경쟁을 하는 상태인 것이다. 멀티 스레드 환경에서 공유 자원에 접근해 상태를 변경할 때 발생한다.

suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {  
    val n = 100  
    val k = 1000  
    val time = measureTimeMillis {  
        val jobs = List(n) {  
            launch { repeat(k) { action() } }  
        }  
        jobs.forEach { it.join() }  
    }  
    println("Completed ${n * k} actions in $time ms")  
}  

var counter = 0  
fun main() = runBlocking {  
    GlobalScope.massiveRun {  
        counter++  
    }  
    println("Counter = $counter")  
}
Completed 100000 actions in 15 ms
Counter = 66505

위와 같이 원하는 결과가 아닌 전혀 다른 결과가 나왔다. 코드를 보면 정확히 100,000개의 job을 리스트에 추가했고, 해당 리스트를 돌면서 각 원소에 대한 join()까지 호출했다. 왜 이런 결과가 발생하는걸까?

우선, launch 함수 안에 스레드 정보를 출력해보면 다음과 같이 여러 스레드를 사용하고 있는 것을 볼 수 있다.

Thread[#22,DefaultDispatcher-worker-2,5,main]
Thread[#24,DefaultDispatcher-worker-4,5,main]
Thread[#23,DefaultDispatcher-worker-3,5,main]
Thread[#21,DefaultDispatcher-worker-1,5,main]
Thread[#25,DefaultDispatcher-worker-5,5,main]
...

이는 한 스레드가 변수를 읽고 증가하기 전에 다른 스레드가 이미 값을 변경해 발생하는 것이다. 이렇게 경쟁 상태(race condition)이 발생하게 되면서 동시성 이슈가 발생하게 된다.

이를 해결하는 가장 간단한 방법은 counterAtomicInteger 타입으로 변경하는 것이지만, 또 다른 방법으로는 코루틴의 동시성을 보장하도록 구조를 변경하는 것이다.

suspend fun massiveRun(action: suspend () -> Unit) {  
    coroutineScope {  
        val n = 100  
        val k = 1000  
        val time = measureTimeMillis {  
            val jobs = List(n) {  
                launch { repeat(k) { action() } }  
            }  
            jobs.forEach { it.join() }  
        }  
        println("Completed ${n * k} actions in $time ms") 
    }  
}  

var counter = 0  
fun main() = runBlocking {  
    massiveRun { counter++ }  
    println("Counter = $counter")  
}

GlobalScope의 경우 애플리케이션 전역에서 동작해 여러 스레드에서 코루틴이 실행된다. 하지만 coroutineScope의 경우 하나의 스레드에서만 코루틴이 동작하게 된다.

Jaeeun1083 commented 7 months ago

Job은 코루틴을 취소하고 상태를 파악하는 등의 역할을 한다는데 그 동작 과정에 대해 알아보자.

Job이란?

코루틴의 실행 상태를 나타내는 인터페이스이다. 각 코루틴은 백그라운드에서 실행될 작업을 나타내는 Job을 가지고 있다.

Job은 코루틴의 생명주기와 관련이 있으며, 주로 코루틴의 실행을 추적하고 관리하는 데 사용된다.

아래의 빌더로 잡을 생성하는 간단한 예제를 통해 Job의 동작 과정에 대해 알아보자.

suspend fun main() = coroutineScope {
    val job = Job()
    println("before complete job : $job")

    job.complete()
    println("after complete job : $job")
}
// Job.kt
public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)

// JobSupport.kt
internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
  init { initParentJob(parent) }
  // ...
}

// JobSupport.kt
protected fun initParentJob(parent: Job?) {
  assert { parentHandle == null }
  if (parent == null) { // parent 가 없을 경우 
    parentHandle = NonDisposableHandle
    return
  }
  parent.start() // make sure the parent is started
  @Suppress("DEPRECATION")
  val handle = parent.attachChild(this) // 자신의 job을 부모의 child로 붙인다 -> 트리 구조가 형성
  parentHandle = handle
  if (isCompleted) {
    handle.dispose()
    parentHandle = NonDisposableHandle
  }
}

// JobSupport.kt
public final override fun attachChild(child: ChildJob): ChildHandle {
  return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(child).asHandler) as ChildHandle
}

만약 예제 코드에서 Job을 생성할 때 부모 Job을 넘긴다면 parent.attachChild(this) 를 통해 자식 Job이 되는 것이다.

suspend fun main() = coroutineScope {
  val job = Job(coroutineContext[Job])
  println("before complete job : $job")

  job.complete()
  println("after complete job : $job")
}

Job의 트리가 어떤 과정을 통해 구성되는지 알겠는데 이 Job이 코루틴에서 어떻게 쓰이는 걸까??

코루틴 빌더는 부모의 잡을 기초로 자신들의 잡을 생성한다.

모든 코루틴은 자신만의 Job을 생성하며 인자 또는 부모 코루틴으로 부터 온 잡은 새로운 잡의 부모로 사용된다.

부모와 자식 관계에 있다면 부모가 자식 코루틴을 기다리게 되는 것이다.

그렇다면 코루틴 빌더는 어떤식으로 구현이 되어있기에 Job을 기다리는 걸까..? 내부 동작 방식을 알아보자.

coroutine builder

코틀린에서 제공하는 primitive coroutine builder에는 크게 3가지가 있는데, 아래와 같이 분류할 수 있다.

CoroutineScope의 extension function - launch

launch 와 async는 비슷하므로 launch의 구현을 확인해보자.

// Builders.common.kt
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

// jvmMain/CoroutineContext.kt
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
  val combined = coroutineContext + context
  val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
  return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
    debug + Dispatchers.Default else debug
}

// Builders.common.kt
private open class StandaloneCoroutine(
  parentContext: CoroutineContext,
  active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
  override fun handleJobException(exception: Throwable): Boolean {
    handleCoroutineException(context, exception)
    return true
  }
}

// Builders.common.kt
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
  start(block, receiver, this)
}

suspending function - coroutineScope

다음으로 suspending function 중 하나인 coroutineScope 동작 과정에 대해 살펴보자.

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return suspendCoroutineUninterceptedOrReturn { uCont -> // uCont : 부모 coroutine
        val coroutine = ScopeCoroutine(uCont.context, uCont)
        coroutine.startUndispatchedOrReturn(coroutine, block)
    }
}

internal open class ScopeCoroutine<in T>(
  context: CoroutineContext,
  @JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame {
    // ...
}
lee-ji-hoon commented 7 months ago

코루틴 Job의 상태 변수


isActive, isCancelled, isCompleted

  • isActive : Job이 실행중인지 여부
  • isCancelled : Job이 취소가 요청됐는지 여부를 표시
  • isCompleted : Job 실행이 완료됐거나, 취소 완료됐는지를 표시
suspend fun main() {
    val job = CoroutineScope(Dispatchers.IO).launch(start = CoroutineStart.LAZY) {
        delay(1000)
    }
    printJob(job)
    job.start()
    printJob(job)
    delay(3000)
    printJob(job)
}

private fun printJob(job: Job) {
    println("job.isActive -> ${job.isActive}")
    println("job.isCancelled -> ${job.isCancelled}")
    println("job.isCompleted -> ${job.isCompleted}")
    println()
}
suspend fun main() {
    val job = CoroutineScope(Dispatchers.IO).launch {
        delay(1000)
    }
    printJob(job)
    job.cancel()
    printJob(job)
    delay(3000)
    printJob(job)
}

private fun printJob(job: Job) {
    println("job.isActive -> ${job.isActive}")
    println("job.isCancelled -> ${job.isCancelled}")
    println("job.isCompleted -> ${job.isCompleted}")
    println()
}
상태 isActive isCancelled isCompleted
생성됨(New) false false false
실행 중(Active) true false false
실행 완료(Completed) false false true
취소 중(Cancelling) false true false
취소 완료(Cancelled) false true true

Deferred를 이용한 결과 수신

Deferred는 말 그대로 연기라는 뜻을 갖고 있다. 결과값 수신을 연기한다 즉 미래의 어느 시점에 결과값이 올 것을 뜻하게 된다.

Deferred는 Job이다.

Deferred는 결과값을 수신하는 비동기 작업이다.

3

Deferred 생성 및 수신

suspend fun main() {
    val deferred: Deferred<Int> = CoroutineScope(Dispatchers.IO).async {
        30
    }
    println(deferred.await())
}

Deferred가 Job과 다른점

suspend fun main() {
    val exceptionHandler = CoroutineExceptionHandler { _, exception ->
        when(exception){
            is IllegalArgumentException -> println("More Arguement Needed To Process Job")
            is InterruptedException -> println("Job Interrupted")
        }
    }

    val deferred = CoroutineScope(Dispatchers.IO).async(exceptionHandler) {
        throw IllegalArgumentException()
        arrayOf(1,2,3)
    }

    delay(1000)
}
suspend fun main() {
    val exceptionHandler = CoroutineExceptionHandler { _, exception ->
        when(exception){
            is IllegalArgumentException -> println("More Arguement Needed To Process Job")
            is InterruptedException -> println("Job Interrupted")
        }
    }

    val deferred = CoroutineScope(Dispatchers.IO).async(exceptionHandler) { 
        throw IllegalArgumentException()
        arrayOf(1,2,3)
    }

    deferred.await()
}

4

suspend fun main() {
    val exceptionHandler = CoroutineExceptionHandler { _, exception ->
        when (exception) {
            is IllegalArgumentException -> println("More Arguement Needed To Process Job")
            is InterruptedException -> println("Job Interrupted")
        }
    }

    val deferred = CoroutineScope(Dispatchers.IO).async {
            throw IllegalArgumentException()
            arrayOf(1, 2, 3)
        }

    CoroutineScope(Dispatchers.IO).launch(exceptionHandler) { 
        // Deferred로부터 애러를 전파받는 위치
        deferred.await()
    }.join()
}

CoroutineContext

Coroutine이 실행되는 환경이라고 이해하면 편하다.

5

6

Dispatcher나 Handler 자리에 CoroutineContext가 필요하다고 나와있는데 자리에 사용하는 것으로 보아 하나 유추할 수 있다.

CoroutineExceptionHandler 와 Dispatcher는 CoroutineContext를 구현한 구현체라는 것이다!

CoroutineContext 합치기

7

suspend fun main() {
    val exceptionHandler = CoroutineExceptionHandler { _, exception ->
        when(exception){
            is IllegalArgumentException -> println("More Arguement Needed To Process Job")
            is InterruptedException -> println("Job Interrupted")
        }
    }

    **val coroutineContext = Dispatchers.IO + exceptionHandler**
    val deferred = CoroutineScope(coroutineContext).async() {
        throw IllegalArgumentException()
        arrayOf(1,2,3)
    }
    deferred.await()
}

Context에 접근하기

fun main() {
    val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable -> }

    val coroutineContext = Dispatchers.IO + exceptionHandler 
        // 부모 CoroutineContext = Dispatcher+ExceptionHandler

    val exceptionHandlerFromContext = coroutineContext[exceptionHandler.key] 
        // Key를 통한 자식 CoroutineContext 접근

    println(exceptionHandler == exceptionHandlerFromContext)
}

8

Context 제거하기

fun main() {
    val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable -> }

    val coroutineContext = Dispatchers.IO + exceptionHandler 
        // 부모 CoroutineContext = Dispatcher+ExceptionHandler

    val exceptionHandlerFromContext = coroutineContext.minusKey(exceptionHandler.key)
    println(exceptionHandlerFromContext)
}

9

10

SupervisorJob을 활용한 Exception Handling

Coroutine은 비동기 프로그래밍을 위한 일시중단 가능한 경량 스레드이다. 코루틴 내부에서 코루틴이 수행될 수 있다.

  • 코루틴 내부에서 자식 코루틴의 에러가 생겼을 때 별도의 Exception Handling을 해주지 않는다면?
  • 자식 코루틴은 부모 코루틴까지 취소시키게 된다.
  • 그리고 부모 코루틴이 취소되면 자식 코루틴 또한 취소가 된다.

0

자식 코루틴1에서 에러가 났을 경우

단방향 Exception 전파를 위한 SupervisorJob

suspend fun main() {
    val supervisor = SupervisorJob()

    CoroutineScope(Dispatchers.IO).launch {
        val firstChildJob = launch(Dispatchers.IO + supervisor) {
            throw AssertionError("첫 째 Job이 AssertionError로 인해 취소됩니다.")
        }

        val secondChildJob = launch(Dispatchers.Default) {
            delay(1000)
            println("둘 째 Job이 살아있습니다.")
        }

        val thirdChildJob = launch(Dispatchers.Unconfined) {
            delay(1000)
            println("셋 째 Job이 살아있습니다.")
        }

        firstChildJob.join()
        secondChildJob.join()
        thirdChildJob.join()
    }.join()
}

2

만약 SupervisorJob을 뺀다면 이전에 그린 그림처럼 될까?

3

supervisorScope

하지만 매번 모든 scope에 supervisor을 걸어두는건 너무 귀찮은 일이다.

  • 이럴때 사용하는것이 supervisorScope이다!
suspend fun main() {
    CoroutineScope(Dispatchers.IO).launch {
        **supervisorScope {**
            val firstChildJob = launch(Dispatchers.IO) {
                throw AssertionError("첫 째 Job이 AssertionError로 인해 취소됩니다.")
            }

            val secondChildJob = launch(Dispatchers.Default) {
                delay(1000)
                println("둘 째 Job이 살아있습니다.")
            }

            val thirdChildJob = launch(Dispatchers.Unconfined) {
                delay(1000)
                println("셋 째 Job이 살아있습니다.")
            }

            firstChildJob.join()
            secondChildJob.join()
            thirdChildJob.join()
        **}**
    }.join()
}

supervisorScope에 대해서 좀 더 파보고 싶었는데 시간이 부족해서 조금만 미루기로

lee-ji-hoon commented 7 months ago

해결못한 것들

lee-ji-hoon commented 6 months ago

supervisorJob 예시에서 최상위 launch에만 job을 등록했을 때 왜 전부 취소가 되는지

image

CoroutineScope.launch

image

  1. launch로 실행이 되면 현재 CoroutineScope의 Context와 인자로 받은 Context를 합친다 > newContext
  2. 그리고 새로운 Coroutine을 만든다.
  3. 해당 Coroutine을 사용해서 block 부분을 실행(start)한다.
  4. Job(Coroutine)을 반환한다.

launch블럭을 보면 이렇게 해석이 되는데 여기서 왜 SupervisorJob은 전파가 안되는데 Dispatcher.IO 같은 다른 CorotuineContext는 전파가되는가. 이것을 알기 위해서는 2번을 가야 한다.

StandAloneCoroutine

isLazy가 true던 false던 상관 없다. LazyStandAloneCoroutine 도 결국에는 StandAloneCoroutine을 상속받아서 만들었기 때문

2번의 내용을 보면 새로운 Coroutine을 만드는데 이때 만들어지는 것은 StandAloneCoroutine 이고 이것은 AbstractCoroutine을 상속받아서 만들어져있다.

AbstractCoroutine

AbstractCoroutine은 Job이다.

image

  • 빨간색 - parentContext 즉 현재 부모 Scope의 CoroutineContext
  • 주황색 - 현재 AbstractCoroutine 자체

그럼 모든 launch / async 같은 경우 Context로 Job을 넘기는 것이 무의미한 것이 아닌가? 라는 의문이 드는데 그것은 아래 코드로 정리가 될 것 같다.

image

AbstractCoroutine init 부분

context가 새롭게 생성되기 전인 init 부분에서 initParentJob 라는 것을 사용해서 현재 Job이 존재한다면 미리 등록을 해두고 있다.

image

parentJob의 job을 보면 SupervisorJob이 등록되어 있는 모습을 확인할 수 있다.

그렇기에 내가 launch(SupervisorJob()) { .. } 이라고 한다면 launch가 실행이 될 때 parentJob에 SupervisorJob이 등록이 되고 { ... } 부터는 제거된 StandAloneCoroutine(Job)이라는 것을 알 수 있다.

정리

위 내용에서 중요한 부분만 정리하자면 아래와 같다.