Jwhyee / kotlin-coroutine-study

코틀린 코루틴 북 스터디 📚
4 stars 1 forks source link

3부 채널과 플로우 - 1 #5

Open lee-ji-hoon opened 3 months ago

lee-ji-hoon commented 3 months ago

16,17,18 장

lee-ji-hoon commented 3 months ago

Channel 초반부를 읽다가 의문이 갔던 내용들 정리

여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수도 있습니다. 하지만 원소를 적절하게 처리하려면 반드시for루프를 사용해야 합니다(consumeEach`는 여러 개의 코루틴이 사용하기에는 안전하지 않습니다.

여기서 consumeEach는 왜 안전하지 않고 for 는 안전하다는걸까? 라는 의문에서 시작이 되었다.

/**
 * Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel]
 * the channel after the execution of the block.
 * If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead.
 *
 * The operation is _terminal_.
 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
 */
public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
    consume {
        for (e in this) action(e)
    }

매우 간단하게 해결이 되었다. 주석을 보면 작업(action)을 수행하고 channel을 취소하는 특성을 갖고 있기 때문이다.

테스트 코드

@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.produceNumbers() = produce {
    repeat(10) {
        delay(100)
        send(it)
    }
}

fun CoroutineScope.launchForProcessor(
    id: Int,
    channel: ReceiveChannel<Int>
) = launch {
    for (msg in channel) {
        println("#$id received $msg")
    }
}

fun CoroutineScope.launchConsumeProcessor(
    id: Int,
    channel: ReceiveChannel<Int>
) = launch {
    channel.consumeEach { msg ->
        println("#$id received $msg")
    }
}

suspend fun main(): Unit = coroutineScope {
    val channel = produceNumbers()
    repeat(3) { id ->
        delay(10)
        // 주석 번갈아가면서 실행
//        launchForProcessor(id, channel)
        launchConsumeProcessor(id, channel)
    }
}
for consumeEach
image image

주석만 봤을 때 1번이 끝나면 해당 채널을 닫아 버리니까 출력이 다를 줄 알았는데 실제로는 동일한 모습이라서 조금 더 파보았다.

파보기

image

try - catch - finally로 되어 있으며 가장 핵심은 finally - cancelConsumed 이다.

tryblock이 끝나면 ReceiveChannelcancel을 명시적으로 호출을 하면서 CancellationException 을 발생시킨다.

그렇기에 ReceiveChannel 이 무조건 닫힌다 그렇기에 안전하지 못하다 라는 결론이 나온거 같다.

Jwhyee commented 3 months ago

무한 대기 상태 이슈

아래 두 코드는 모두 채널에 보관된 모든 원소를 소비하는 것에서 동일한 코드이다. 하지만 첫 번째 코드는 프로그램이 정상 종료되지만, 두 번째 코드는 대기 상태에 빠지게 된다.

suspend fun main(): Unit = coroutineScope {  
    val channel = Channel<Int>()  
    launch {  
        repeat(5) { idx ->  
            delay(1000)  
            println("Producing next one")  
            channel.send(idx * 2)  
        }  
    }  
    launch {  
        repeat(5) { idx ->  
            val receive = channel.receive()  
            println(receive)  
        }
    }  
}
suspend fun main(): Unit = coroutineScope {  
    val channel = Channel<Int>()  
    launch {  
        repeat(5) { idx ->  
            delay(1000)  
            println("Producing next one")  
            channel.send(idx * 2)  
        }  
    }  
    launch {
        channel.consumeEach {  
            println(it)  
        }  
    }
}

너무도 당연하지만, 첫 번째 코드는 단순히 repeat을 통해서 receive를 하기 때문에 5번의 반복 이후 코루틴이 종료된다. 하지만, 두 번째 코드는 채널을 순회하면서 원소를 모두 소비(action)하게 된다.

public suspend inline fun <E> ReceiveChannel<E>.consumeEach(
    action: (E) -> Unit
): Unit = consume {  
    for (e in this) action(e)  
}

채널은 원소를 모두 소비했다고 닫히지 않는다. 책에 나오듯이, 채널에 원소가 없다면 코루틴은 원소가 들어올 때까지 중단한다. 즉, 위 코드에서는 모든 원소를 소비해서 아무 원소가 없기 때문에 계속 무한 중단 상태에 빠지게 되는 것이다.

아래 produce 빌더를 사용한 코드는 원소를 모두 소비하면 종료하게 된다. 이 코드는 어떻게 동작하는걸까?

suspend fun channelProduceTest(): Unit = coroutineScope{  
    val channel = produce {  
        repeat(5) {  
            println("Producing next one")  
            delay(1000)  
            send(it * 2)  
        }  
        channel.close()  
    }  
    for (i in channel) {  
        println(i)  
    }  
}
internal fun <E> CoroutineScope.produce(  
    context: CoroutineContext = EmptyCoroutineContext,  
    capacity: Int = 0,  
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,  
    start: CoroutineStart = CoroutineStart.DEFAULT,  
    onCompletion: CompletionHandler? = null,  
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit  
): ReceiveChannel<E> {  
    val channel = Channel<E>(capacity, onBufferOverflow)  
    val newContext = newCoroutineContext(context)  
    val coroutine = ProducerCoroutine(newContext, channel)  
    if (onCompletion != null) 
        coroutine.invokeOnCompletion(handler = onCompletion)  
    coroutine.start(start, coroutine, block)  
    return coroutine  
}

딱히 코드만 봐서는 close()가 되는 곳을 찾을 수 없다. 하지만 coroutine 객체를 생성하는 곳을 보면 알 수 있다.

private class ProducerCoroutine<E>( 
    parentContext: CoroutineContext, channel: Channel<E>  
) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E> {  
    override val isActive: Boolean  
        get() = super.isActive  

    override fun onCompleted(value: Unit) {  
        _channel.close()  
    }  

    override fun onCancelled(cause: Throwable, handled: Boolean) {  
        val processed = _channel.close(cause)  
        if (!processed && !handled) 
            handleCoroutineException(context, cause)  
    }  
}

onCompleted() 함수 내부에 channel.close() 함수가 포함되어 있다. 즉, coroutine.start()를 통해 코루틴을 시작한 뒤, 해당 코루틴이 완료 상태로 바뀔 경우 해당 함수가 호출되면서 채널도 함께 닫히게 되는 것이다.

lee-ji-hoon commented 3 months ago

다음주

19, 20, 21 장

추가 정리하면 좋을 내용들