Jwhyee / kotlin-coroutine-study

코틀린 코루틴 북 스터디 📚
4 stars 1 forks source link

3부 채널과 플로우 - 3 #7

Open lee-ji-hoon opened 4 months ago

lee-ji-hoon commented 4 months ago

22, 23장

lee-ji-hoon commented 4 months ago

combine 내부 구조 파악

image

graph TD

A[combineInternal 함수 시작] --> B{flows 배열의 크기 확인}
B -->|0| C[return]
B -->|크기가 0이 아님| D[latestValues 배열 초기화]
D --> E[resultChannel 채널 생성]
E --> F[nonClosed 변수 생성 및 초기화]
F --> G[remainingAbsentValues 변수 생성 및 초기화]
G --> H[각 플로우에 대한 Coroutine 시작]

subgraph Coroutine Loop
    H --> I[각 플로우를 collect하여 resultChannel에 Update 전송]
    I --> J[Coroutine 종료 시 nonClosed 감소]
    J --> K{모든 플로우가 종료되었는지 확인}
    K -->|예| L[resultChannel 닫기]
    K -->|아니오| M[다음 Coroutine 처리]
end

H --> N[Batch 수신 최적화 시작]
N --> O[currentEpoch 증가]
O --> P{resultChannel에서 첫 번째 요소 수신}
P -->|수신 실패| Q[break - while 루프 종료]
P -->|수신 성공| R[latestValues 배열 업데이트]

subgraph 내부 while 루프
    R --> S{이전 값이 UNINITIALIZED인지 확인}
    S -->|예| T[remainingAbsentValues 감소]
    S -->|아니오| U[다음 요소 수신 시도]
    U --> V{같은 플로우의 두 번째 값인지 확인}
    V -->|예| W[break - 내부 while 루프 종료]
    V -->|아니오| X[lastReceivedEpoch 업데이트]
    X --> Y[다음 요소 수신 시도]
end

R --> Z{remainingAbsentValues가 0인지 확인}
Z -->|예| AA[arrayFactory를 통해 results 배열 생성]
AA --> AB{results가 null인지 확인}
AB -->|예| AC[transform 함수 호출 - latestValues 배열 사용]
AB -->|아니오| AD[latestValues 배열을 results 배열로 복사]
AD --> AE[transform 함수 호출 - results 배열 사용]
Jaeeun1083 commented 4 months ago

zip 함수의 내부 동작

두 플로우로부터 쌍을 만드는 zip 함수를 호출하면 내부에서 어떤 동작을 하는지 알아보자

/* Zip.kt */
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)

unsafeFlow

internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
unsafeFlow {
    coroutineScope {
        ...
    }
}

produce

val second = produce<Any> {
    flow2.collect { value ->
        return@collect channel.send(value ?: NULL)
    }
}

collectJob 생성 및 확인

val collectJob = Job()
(second as SendChannel<*>).invokeOnClose {
    // Optimization to avoid AFE allocation when the other flow is done
    if (collectJob.isActive) collectJob.cancel(AbortFlowException(collectJob))
=}

첫 번째 Flow 수집 및 변환

try {
    val scopeContext = coroutineContext
    val cnt = threadContextElements(scopeContext)
    withContextUndispatched(coroutineContext + collectJob, Unit) {
        flow.collect { value ->
            withContextUndispatched(scopeContext, Unit, cnt) {
                val otherValue = second.receiveCatching().getOrElse {
                    throw it ?: AbortFlowException(collectJob)
                }
                emit(transform(value, NULL.unbox(otherValue)))
            }
        }
    }
} catch (e: AbortFlowException) {
    e.checkOwnership(owner = collectJob)
} finally {
    second.cancel()
}
Jaeeun1083 commented 4 months ago

collectSafely

AbstractFlow 클래스를 보며 내부에서는 왜 SafeCollector 객체를 만들고 collectSafely를 호출할까? 이에 대해 간단히만 알아보자

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }
}

collectSafely에 대한 설명은 다음과 같다.

이 메서드의 유효한 구현은 다음과 같은 제약 조건을 가진다.

  1. 값을 방출할 때 코루틴 컨텍스트를 변경하지 않아야한다
  2. 호출을 방출하기 위해 직렬화해야한다.

SafeCollector는 컨텍스트 보존이나 예외 투명성 불변 조건이 깨지지 않도록 보장하는 FlowCollector의 인스턴스이다.

emit 호출은 현재의 코루틴 컨텍스트를 캡처하고, 이전에 캡처한 컨텍스트와 다르지 않은지 확인한 후 계속 진행된다.

만약 다운스트림에서 예외가 발생하면, 이를 잡아내고 이후의 방출 시도는 IllegalStateException을 발생시킨다.

Flow 연산이 동일한 코루틴 컨텍스트 내에서 실행됨을 보장하고 예외가 발생하면 적절히 처리하기 위해서라고 할 수 있을 것 같다

Jwhyee commented 4 months ago

22. 플로우 생명주기 함수

onStart

onStart는 어떻게 onEach보다 먼저 호출이 되는 것일까?

suspend fun onStartTest() {  
   flowOf(1, 2, 3, 4)  
      .onEach { delay(1000) }  
      .onStart { println("Before") }  
      .collect { println(it) }  
}

위 코드를 보면 당연히 flowOf -> onEach -> onStart -> collect 순서로 시작한다. 하지만 onStart의 경우 책에 나온 것과 같이 첫 번째 원소를 요청했을 때 호출된다. 이게 어떻게 가능한걸까?

이는 디컴파일해보면 이유를 알 수 있다.

@Nullable  
public static final Object onStartTest(
   @NotNull Continuation $completion
) {  
   Object var10000 = FlowKt.onStart(
      FlowKt.onEach(
         FlowKt.flowOf(new Integer[]{
            Boxing.boxInt(1), 
            Boxing.boxInt(2), 
            Boxing.boxInt(3), 
            Boxing.boxInt(4)}
         ), 
    (Function2)(new Function2((Continuation)null) { 
            // ...
         }

위 코드와 같이 onStart의 경우 연쇄 함수 중에서도 가장 우선 순위가 높아 먼저 호출되도록 설계가 되어있다.

collect() 
   └─> onStart { println("Before") } 
                  └─> onEach { delay(1000) } 
                                 └─> collect { println(it) }

때문에 onStart 내부 코드를 보면, 인자로 받은 블록(action)을 실행하고, 남은 메소드 체이닝을 진행하기 위해 collect하는 것을 볼 수 있다.

public fun <T> Flow<T>.onStart(  
   action: suspend FlowCollector<T>.() -> Unit  
): Flow<T> = unsafeFlow { 
   // Note: unsafe flow is used here, 
   // but safe collector is used to invoke start action  
   val safeCollector = SafeCollector<T>(this, currentCoroutineContext())  
   try {  
      safeCollector.action()  
   } finally {  
      safeCollector.releaseIntercepted()  
   }  
   collect(this) // directly delegate  
}
Jwhyee commented 4 months ago