Jwhyee / kotlin-coroutine-study

코틀린 코루틴 북 스터디 📚
4 stars 1 forks source link

1부 코틀린 코루틴 이해하기 #1

Open lee-ji-hoon opened 4 months ago

lee-ji-hoon commented 4 months ago

3월 10일 오후 2시

lee-ji-hoon commented 4 months ago

1장 코틀린 코루틴을 배워야 하는 이유

비동기 작업을 할 때 Thread를 생성해서 할 수도 있지만, Thread는 생성 비용도 크며, 멈출 수 있는 방법이 없어 메모리 누수로 이어지기도 하며, 관리하기가 어렵다는 단점이 있다.

✅ 비동기 처리 방법

위 2가지 방식이 있는데 콜백의 경우 A, B 두 개의 Call이 있을 때 무조건적으로 동기 방식으로 처리해야 하며, 비동기로 동작이 불가능하고 들여쓰기가 강제가 된다는 단점이 존재하는 반면 코루틴은 A, B 두 작업이 있을 때 동시에 처리 가능하고 들여쓰기 또한 필요가 없다는 점에서 가독성 측면에서 장점이 있다.

1장 마지막에 Coroutine 10만개와 Thread 10만개 테스트를 보면 왜 Coroutine이 경량 쓰레드인지 확인할 수 있었다.

2장 시퀀스 빌더

2장의 내용만 봤을 때 Sequence는 중단도 가능하고 Lazy 연산이기에 많은 이점이 있는 것으로 보이는데 List와 비교해 보면 어떨까 라는 생각으로 정리하게 됐습니다.

🤔 Sequence는 무조건 List보다 빠를까?

참고 블로그

상태 List 결과
Sequence image
Collection image

위 그림을 보면 Sequence의 경우 배열이 현재 Sequence 에서 몇 개가 내려올지 모르므로 기본값 opacity가 10인 상태로 List가 만들어지고 데이터가 계속 들어오다가 초과를 하게 되면 배열의 크기가 늘어나는 상태입니다.

그 반면 Collection의 경우 위에서 몇 개가 내려올지 이미 알고 있으므로 해당 크기만큼의 List에 opacity를 지정하고 만들기에 새로운 배열이 만들어지지 않습니다.

이 내용에서 Sequnece는 Lazy이므로 연산 작업의 경우 더 빠른 것이 맞으며, toList() 동작성에서 차이가 있는 것으로 보인다.

✅ Sequence의 toList

✅ Collection의 고차함수들

image

샘플 코드
```kotlin // 시간 및 메모리 측정 함수 inline fun measureTime(timeUnit: TimeUnit = TimeUnit.MILLISECONDS, block: () -> T): T { val startTime = System.nanoTime() val result: T = block() // 람다 블록의 실행 결과를 저장 val endTime = System.nanoTime() val duration = endTime - startTime // 선택한 TimeUnit에 따라 실행 시간을 변환 val convertedDuration = when (timeUnit) { TimeUnit.NANOSECONDS -> duration TimeUnit.MICROSECONDS -> TimeUnit.NANOSECONDS.toMicros(duration) TimeUnit.MILLISECONDS -> TimeUnit.NANOSECONDS.toMillis(duration) TimeUnit.SECONDS -> TimeUnit.NANOSECONDS.toSeconds(duration) TimeUnit.MINUTES -> TimeUnit.NANOSECONDS.toMinutes(duration) TimeUnit.HOURS -> TimeUnit.NANOSECONDS.toHours(duration) TimeUnit.DAYS -> TimeUnit.NANOSECONDS.toDays(duration) } println("Execution time: $convertedDuration ${timeUnit.name.lowercase()}") return result // 람다 블록의 실행 결과를 반환 } fun printUsedMemory() { val runtime = Runtime.getRuntime() val memoryUsed = (runtime.totalMemory() - runtime.freeMemory()) / 1024 println("Used memory: ${memoryUsed}KB") } ``` ```kotlin // 테스트 실행 코드 fun main() { val randomList = List(1000000) { (0..100).random() } printUsedMemory() measureTime { randomList .asSequence() .filter { it % 2 == 0 } .map { it } .toList() /*randomList .filter { it % 2 == 0 } .map { it }*/ } printUsedMemory() } ``` | 상태 | 결과 | | ------ | ------ | | Sequence | ![image](https://github.com/Jwhyee/kotlin-coroutine-study/assets/53300830/e5ee9d4f-811f-4deb-b036-eb2739ed560c) | | Collection | ![image](https://github.com/Jwhyee/kotlin-coroutine-study/assets/53300830/15251c78-3b01-4a13-924f-e2708ebbc044) |

3장 중단은 어떻게 작동할까?

✅ Continutation

Continutation이 처음 등장하면서 Coroutine에서 어떻게 함수를 중간에 중단하고 다시 재개할 수 있는지에 대한 내용을 시작하는지에 대해서 이야기를 하며 delay 함수를 직접 구현하는 과정 속에서 continutation에 왜 Unit 을 사용하는지 알려주는 내용이 괜찮았던 거 같다. 자연스럽게 내부 구조를 한번 보게 해 주었기 때문

✅ 함수가 아닌 코루틴을 중단시킨다.

마지막으로 3장에서는 함수가 아닌 코루틴을 중단시킨다. 이 내용이 가장 중요한 거 같다. continuration을 저장하고 resume을 호출해도 해당 함수가 호출이 안 되는 상황을 예시로 보여줬는데 이 내용은 4장을 이해하고 난 뒤 어떻게 수정해야지 after까지 호출이 될까 고민을 해보면 좋을 거 같다.

🤔 함수가 아닌 코루틴을 중단시킨다. 의 예제에서 어떻게 하면 after까지 호출이 될 수 있게 할까?

내가 생각한 정답

4장 코루틴의 실제 구현

3장 마지막에 코루틴 내부 구현까지 알 필요가 없다고 생각하면 넘겨도 된다고 했는데, 무조건 알아야 한다고 생각한다

가장 핵심으로 봐야 할 내용은 Continuation이 어떻게 함수를 재개할 때 함수의 상태를 기억하고 다시 재개하는가? 이것이 가장 핵심인 거 같다.

✅ suspend decompile

직접 디컴파일해서 봤을 때 가장 중요하게 생각한 것은 실행할 때마다 result, label을 저장하고 다시 호출될 때 switch에서 label을 기준으로 실행한다.

위 개념을 이해하면 어떻게 함수내에서 Coroutine이 재개할 때 어느 위치에서 이전에 실행한 값을 알고 있는지를 알 수 있었으며 책에서 나온 예제보다 조금 더 간단하게 예제를 하나 만들었다.

Continuation 간단 예제
```kotlin fun main() = doSomething(MyContinuationImpl()) class MyContinuationImpl( override val context: CoroutineContext = EmptyCoroutineContext, var label: Int = 0, ) : Continuation { var result: Any? = null override fun resumeWith(result: Result) { this.result = result.getOrThrow() println("Continuation.resumeWith() 호출\n") doSomething(this) } } suspend fun test() { val test = suspendCoroutine { it.resume(10) } } fun doSomething(cont: MyContinuationImpl) { println("doSomething") when(cont.label) { 0 -> { cont.label = 1 signup(cont) } 1 -> { val userData = cont.result cont.label = 2 updateSharedPreference(userData as String, cont) } 2 -> { moveFragmentMenuList() } else -> { throw IllegalStateException("") } } } fun signup(cont: MyContinuationImpl) { println("signup() 호출") println("현재 label -> ${cont.label}") val apiResult = "네트워크 통신 -> Ezhoon 데이터" cont.resumeWith(Result.success(apiResult)) } fun updateSharedPreference(user: String, cont: MyContinuationImpl) { println("updateSharedPreference(), 호출") println("현재 label -> ${cont.label}") cont.resumeWith(Result.success(user)) } fun moveFragmentMenuList() { println("moveFragmentMenuList(), 호출") } ``` image

5장 코루틴: 언어 차원에서의 지원 vs 라이브러리

여기는 별 내용이 없고, Coroutine에 대한 개념이 언어 차원에서 지원을 하는지, 라이브러리를 사용해야 하는 것인지에 대한 내용을 설명하고 있다.

Jwhyee commented 4 months ago

[1장] 코틀린 코루틴을 배워야 하는 이유

리액티브 프로그래밍

데이터 흐름에 초점을 맞추어 동작하며, 데이터를 비동기적으로 처리하고, 이벤트 기반 아키텍처를 통해 실시간으로 데이터의 변화에 반응할 수 있도록하는 프로그래밍

책에 나온 것과 같이 비동기 연산을 하기 위해 스레드 전환 방식을 사용하기엔 컨텍스트 스위칭 비용이 많이 들고, 스레드가 실행된 상태에서 멈출 수 있는 방법이 없어 메모리 누수로도 이어질 수 있다. 그러자니 콜백을 사용하기엔 읽기도 어렵고, 콜백 지옥에 빠지기 쉽다.

이러한 상황에서 리액티브 프로그래밍을 사용할 경우 쉽게 해결할 수 있다. 이 방식을 사용하면 데이터 스트림 내에서 일어나는 모든 연산을 시작, 처리, 관찰할 수 있다.

fun onCreate() {
    disposables += getNewsFromApi()
        .subscribeOn(Shedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .map { news ->
            news.sortedByDescending { it.publishedAt }
        }
        .subscribe { sortedNews ->
            view.showNews(sortedNews)
        }
}

우선, getNewsFromApi의 반환 타입은 Observables이라고 가정하고 흐름을 살펴보자.

  1. subscribeOn
    • getNewsFromApi에서 받아온 데이터를 토대로 데이터 흐름을 만듦
    • 앞으로의 연산을 할 스레드로 Shedulers.io() 지정
  2. observeOn
    • 최종 연산 후 컨텍스트 스위칭할 스레드를 지정
  3. subscribe
    • 최종 연산을 observeOn에 지정한 스레드에서 진행

이렇게 리액티브 프로그래밍을 했을 때의 장점은 메모리 누수도 없으며, 취소도 가능하고, 스레드를 적절하게 사용할 수 있다. 하지만 이를 사용하기 위한 함수들을 배워야하고, 기존 코드를 Observable과 같은 클래스로 래핑하는 작업도 해야하는 번거로움이 있다.

코틀린 코루틴

코루틴을 사용하면 기존 코드를 메인 스레드를 블로킹 없이 사용할 수 있다. 또한, 코루틴을 중단시켜도, 스레드는 블로킹되지 않으며, 다른 코루틴을 실행하는 등의 작업도 가능하다.

fun onCreate() {
    viewModelScope.launch {
        val news = getNewsFromApi()
        val sortedNews = news
            .sortedByDescending { it.publishedAt }
        view.showNews(sortedNews)
    }
}

위 코드와 같이 리액티브 라이브러리를 사용한 것보다 훨씬 간편하게 사용이 가능하다. 만약 3개의 엔드포인트를 호출해야할 경우 다음과 같이 작성할 수 있다.

fun showNews() {
    viewModelScope.launch {
        val config = async { getConfigFromApi() }
        val news = async { getNewsFromApi(config.await()) }
        val user = async { getUesrFromApi() }
        view.showNews(user.await(), news.await())
    }
}

showNews()가 호출될 경우 async가 붙은 모든 함수 호출을 병렬로 처리한다. news의 경우 config가 필요하기 때문에 해당 함수의 처리가 완료될 때까지 다른 코루틴 블록을 처리하게 된다.

백엔드의 코루틴

백엔드에서의 코루틴 사용은 단순히 suspend 제어자를 추가하는 것만으로도 쉽게 구현할 수 있다.

suspend fun getArticle(
    key: String,
    lang: Language
): ArticleJson? {
    return articleRepository.getArticle(articleKy, lang)
        ?.let { toArticleJson(it) }
}
suspend fun getAllArticles(
    userId: String?,
    lang: Language
): List<ArticleJson> = coroutineScope {
    val user = async { userRepo.findUserById(userId) }
    val articles = articleRepo.getArticles(lang)
    articles
        .filter { hasAccess(user.await(), it) }
        .map { toArticleJson(it) }
}

이렇게 코루틴을 사용하는 가장 중요한 이유는 스레드를 사용하는 비용이 크기 때문이다. 수백만 명의 사용자들이 앱을 사용할 때, DB 혹은 다른 서비스로부터 응답을 기다릴 때마다 블로킹이 이뤄진다면, 메모리나 프로세스 사용에 큰 비용이 들 것이다.

대용량 처리

백엔드에서 10만 건의 데이터를 저장한다고 가정해보자.

@PostConstruct  
private fun dataInit() {  
    for (i in 0 until 100_000) {  
        val result = random()  
        val mul = result * 10  
        repo.save(Member(null, mul.toInt().toString(), mul.toInt()))  
    }  
}

10만 건의 데이터를 처리하는데 걸리는 시간은 대략 50초가 걸린다.

13:07:48 - start
13:08:37 - end

그렇다면 코루틴을 사용하면 어떨까?

@PostConstruct  
private fun dataInit() {  
    log.info("data init start")  
    val scope = CoroutineScope(Dispatchers.Default)   
    scope.launch {  
        val jobs = mutableListOf<Deferred<Member>>()  
        for (i in 0 until 100_000) {  
            val result = random()  
            val mul = result * 10  
            val job = async { 
                repo.save(Member(null, mul.toInt().toString(), mul.toInt()))
            }  
            jobs.add(job)  
        }  
        jobs.awaitAll()  
        log.info("repo.findAll().size = ${repo.findAll().size}")  
    }  

}

코루틴을 사용할 경우 대략 15초가 나온다. 모든 코루틴을 시작하는 비용은 스레드 생성해 처리하는 것에 비해 비교도 안 될 정도로 저렴하고, 사람이 인지할 수 없는 정도이다.

[2장] 시퀀스 빌더

코틀린의 시퀀스는 List, Set과 비슷한 개념이지만, 필요할 때마다 값을 하나씩 계산하는 지연(lazy) 처리 한다. 다음은 시퀀스의 특징이다.

이러한 특징 때문에 값을 순차적으로 계산해 필요할 때 반환하는 빌더를 정의하는 것이 좋다.

val seq = sequence {
    yield(1)
    yield(2)
    yield(3)
}

fun main() {
    for(num in seq) print(num)
}

yield를 만나면 자신을 호출한 곳으로 다시 돌아가고, 다시 num을 찾기 위해 seq 내부로 들어가면 이전에 실행을 한 yield(1) 이후부터 진행하는 것을 알 수 있다. 이렇게 중단이 가능하기 때문에 main 함수와 시퀀스 제너레이터가 번갈아가면서 실행된다.

코루틴을 사용하지 않을 경우 스레드를 이용할 수 있지만, 그럴 경우 유지하고 관리하는 비용이 크게 든다. 하지만 코루틴은 중단이 가능하고, 이전 지점으로 다시 돌아가 다음 값을 생성할 수 있다.

[3장] 중단은 어떻게 작동할까?

중단

중단(suspend) 함수는 코틀린 코루틴의 핵심이다. 중단이 가능하다는 건 코틀린 코루틴의 다른 모든 개념의 기초가 되는 필수적인 요소이다.

코루틴을 중단한다는 것은 게임에서 저장을 하고 체크 포인트에서부터 시작하는 것처럼 실행을 중간에 멈추는 것을 의미한다. 코루틴은 중단되었을 때 Continuation 객체를 반환한다. 이 객체는 멈췄던 곳에서 다시 코루틴을 실행할 수 있도록 하는 객체이다.

스레드의 경우 저장이 불가능하고 멈추는 것만 가능하지만 코루틴의 경우 멈추는 것도 가능하고, 어디까지 진행했는지 저장도 할 수 있는 것이다. 이 점이 코루틴의 강력한 도구이다.

재개

재개란, 게임을 할 때 '이어서 하기'를 누르면 저장했던 체크 포인트에서 시작하는 느낌이다. 작업을 재개하려면 코루틴이 필요하다.

suspend fun main() {
    println("Before")
    println("After")
}

위 코드의 경우 두 줄 모두 출력이 된다. 그렇다면 중간에 중단 함수를 넣으면 어떻게 될까?

suspend fun main() {
    println("Before")
    suspendCoroutine<Unit> { }
    println("After")
}

이 경우에는 Before만 출력되고 이후 작업은 진행되지 않는다. Before 이후에 중단된 상태로 main 함수가 끝나지 않았기 때문이다.

suspend fun main() {
    println("Before")
    suspendCoroutine<Unit> { continuation ->
        continuation.resume(Unit)
    }
    println("After")
}

이렇게 resume()을 호출해야지 코루틴을 중단하고 기존 작업을 재개한다.

suspend fun main() {
    println("Before")
    suspendCoroutine<Unit> { continuation ->
        println("Suspended")
        Thread.sleep(1000L)
        continuation.resume(Unit)
        println("Resume")
    }
    println("After")
}
Before
Suspended
(1초 Sleep)
After
Resume

이렇게 스레드를 재우는 방법 보다는 JVM이 제공하는 ScheduledExecutorService를 대신해서 사용할 수 있다. 이는 정해진 시간이 지나면 continuation.resume(Unit)을 호출하도록 알람을 설정하는 기능이다.

private val executor = Executors.newSingleThreadScheduledExecutor() {  
    Thread(it, "scheduler").apply { isDaemon = true }  
}  

suspend fun delay(timeMillis: Long): Unit =  
    suspendCoroutine { cont ->  
        executor.schedule({  
        cont.resume(Unit)  
    }, timeMillis, TimeUnit.MILLISECONDS)  
}  

suspend fun main() {  
    println("Before")  
    delay(1000)   
    println("After")  
}

이전 코드가 하나의 스레드를 블로킹한다. 하지만 위 코드의 executor는 스레드를 사용하긴 하지만 delay 함수를 사용하는 모든 코루틴의 전용 스레드이다.

값으로 재개하기

API를 호출해 네트워크 응답을 기다리는 것처럼 특정 데이터를 기다리기 위해 중단되는 상황은 자주 발생한다. 스레드는 특정 데이터가 필요한 지점까지 비즈니스 로직을 수행하고, 이후 네트워크 라이브러리를 통해 데이터를 요청한다. 코루틴이 없다면 스레드는 응답을 기다릴 수 밖에 없다.

하지만 코루틴을 사용하면 중단함과 동시에 데이터를 받고 난 뒤, resume 함수를 통해 받을 수 있다. 그러면 기존 메인 스레드는 멈추지 않고, 다른 작업을 계속할 수 있다. 그러던 중 데이터가 도착할 경우 스레드는 코루틴이 중단된 지점에서 재개(resume)하게 된다.

private suspend fun apiTest() {  
    println("BEFORE")  
    val user = requestUser()  
    println(user)  
    println("AFTER")  
}  

private suspend fun requestUser() = suspendCoroutine<User> { cont ->  
    requestUser {  
        cont.resume(it)  
    }  
}  

private fun requestUser(block: (User) -> Unit) {  
    block(createUser())  
}

함수가 아닌 코루틴을 중단시킨다.

중단 함수는 코루틴이 아니고, 단지 코루틴을 중단할 수 있는 함수라고 할 수 있다. 아래 코드는 변수에 Continuation 객체를 저장하고, 함수를 호출한 다음에 재개하는 코드이다.

var continuation: Continuation<Unit>? = null  

suspend fun main() {  
    println("BEFORE")  
    suspendAndSetContinuation()  
    continuation?.resume(Unit)  
    println("AFTER")  
}  

suspend fun suspendAndSetContinuation() {  
    suspendCoroutine<Unit> { cont ->  
        continuation = cont  
    }  
}

위 코드는 의도와 달리 resume이 호출되지 않아 종료되지 않는다. 이를 실행되게 하려면 resume 코드를 suspendAndSetContinutation 함수 내부의 suspendCoroutine 안으로 옮겨야 한다. 혹은 함수 전체를 coroutineScope 내부에서 실행되도록 해야 한다.

var continuation: Continuation<Unit>? = null  

suspend fun main() = coroutineScope {  
    println("BEFORE")  

    launch {  
        delay(1000)  
        continuation?.resume(Unit)  
    }  
    suspendAndSetContinuation()  
    println("AFTER")  
}  

suspend fun suspendAndSetContinuation() {  
    suspendCoroutine<Unit> { cont ->  
        continuation = cont  
    }  
}

하지만 위 코드는 메모리 누수가 발생한다. 그 이유는 suspendAndSetContinuation() 함수에서 suspendCoroutine을 사용하여 continuation을 설정하고 있지만, 해당 코루틴이 재개(resume)되지 않은 채로 launch 함수가 호출되기 때문입니다. launch 함수 내에서 delay(1000) 함수를 사용하여 1초 후에 resume()이 호출되도록 하였지만, suspendAndSetContinuation() 함수가 완료되지 않은 상태에서 resume()이 호출되면 해당 continuation은 여전히 유지된다.

[4장] 코루틴의 실제 구현

내용을 제대로 이해하지 못했음.

코루틴의 동작 과정 중 중요한 점은 다음과 같다.

Jaeeun1083 commented 4 months ago

코루틴의 동작 과정

Kotlin 컴파일러가 suspend 키워드를 만나면 어떻게 동작할까?

1. CPS (continuation-passing style)

suspend fun postItem(item: Item): Post {
    val token = requestToken()
    val post = createPost(token, item)
    return processPost(post)
}
     ↓ 
fun postItem(item: Item, cont: Continuation<Post>) { 
    val token = requestToken()
    val post = createPost(token, item)
    cont.resume(processPost(post))
}

2. State machine - Suspension Points를 기준으로 코드 블록이 구분

fun postItem(item: Item, cont: Continuation<Post>) { 
    // LABEL 0
    val token = requestToken()
    // LABEL 1
    val post = createPost(token, item)
    // LABEL 2
    cont.resume(processPost(post))
}
fun postItem(item: Item, cont: Continuation<Post>) {
    switch (label) {
        case 0:
            val token = requestToken()
        case 1:
            val post = createPost(token, item)
        case 2:
            processPost(post)
    }
}

3. State machine - label과 각 suspend fun 내부 변수들이 관리

resumeWith는 어떻게 동작할까? (BaseContinuationImpl)

BaseContinuationImpl 클래스를 통해 resumeWith 의 동작 방식을 알아보자.

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {

    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted()
                if (completion is BaseContinuationImpl) {
                    current = completion
                    param = outcome
                } else {
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
    ...
}

resumeWith