Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.85k stars 1.83k forks source link

Merge operator not working as expected (ensure that merge is collecting) #1831

Closed brendanw closed 3 years ago

brendanw commented 4 years ago

Branch: native-mt (last commit: 10979a350426f23c02356c66acfda1e765d23450) gradle task: macosX64Test

This test passes

  @Test
  fun bindTestPass() = runTest {
    val channel = BroadcastChannel<Int>(Channel.BUFFERED)
    val initialized = atomic(false)
    withTimeoutOrNull(5000) {
      launch(Dispatchers.Default) {
        //merge is what is breaking the flow
        channel.asFlow().asBindFlow {
          expect(1)
          initialized.value = true
        }.collect {
          expect(3)
        }
      }
      launch(Dispatchers.Default) {
        while(!initialized.value) {

        }
        expect(2)
        channel.send(0)
      }
    }
    channel.close()
    finish(4)
  }

This test fails

  @Test
  fun bindTestFail() = runTest {
    val channel = BroadcastChannel<Int>(Channel.BUFFERED)
    val initialized = atomic(false)
    withTimeoutOrNull(5000) {
      launch(Dispatchers.Default) {
        //merge is what is breaking the flow
        merge(channel.asFlow()).asBindFlow {
          expect(1)
          initialized.value = true
        }.collect {
          expect(3)
        }
      }
      launch(Dispatchers.Default) {
        while(!initialized.value) {

        }
        expect(2)
        channel.send(0)
      }
    }
    channel.close()
    finish(4)
  }

Both tests depend on asBindFlow defined here

@InternalCoroutinesApi
fun <T> Flow<T>.asBindFlow(onBind: () -> Unit): Flow<T> {
  return BindFlow(onBind = onBind, flow = this)
}

@InternalCoroutinesApi
class BindFlow<T>(val onBind: () -> Unit, val flow: Flow<T>) : Flow<T> {
  private var hasBinded = atomic(false)

  @InternalCoroutinesApi
  override suspend fun collect(collector: FlowCollector<T>) {
    if (hasBinded.compareAndSet(expect = false, update = true)) {
      onBind()
    }
    flow.collect {
      collector.emit(it)
    }
  }
}
brendanw commented 4 years ago

I believe the mistake is on my end. The assumption I was making is thinking that each flow combined by the merge function would immediately dispatch emissions to the channel backing the merged flow. If I look at the naive merge implementation that is commented out in source (without fusing) then I can see that this assumption is wrong. A coroutine is launched per flow passed to the merge function. We have no guarantee where each of those coroutines will be in its execution at the moment that the collect call is made on the channelFlow representing the merged flows.

This ended up being my approach:

fun <T> List<Flow<T>>.onBindMerge(onBind: () -> Unit): Flow<T> {
  var boundFlows = atomic(0)
  return channelFlow {
    forEach { flow ->
      launch {
        flow.asBindFlow {
          if (boundFlows.incrementAndGet() == size) onBind()
        }.collect {
          send(it)
        }
      }
    }
  }
}

@InternalCoroutinesApi
fun <T> Flow<T>.asBindFlow(onBind: () -> Unit): Flow<T> {
  return BindFlow(onBind = onBind, flow = this)
}

@InternalCoroutinesApi
class BindFlow<T>(val onBind: () -> Unit, val flow: Flow<T>) : Flow<T> {
  private var hasBinded = atomic(false)

  @InternalCoroutinesApi
  override suspend fun collect(collector: FlowCollector<T>) {
    if (hasBinded.compareAndSet(expect = false, update = true)) {
      onBind()
    }
    flow.collect {
      collector.emit(it)
    }
  }
}

I use this for implementing a MVI/redux-like StateMachine

class StateMachine<R : Any, T>(
  val scope: CoroutineScope,
  private val initialState: T,
  private val sideEffects: List<(Flow<R>, () -> T) -> Flow<R>>,
  private val reducer: suspend (accumulator: T, value: R) -> T
) {
  val viewState: FlowRelay<T> = FlowRelay()
  private var isInitialized = atomic(false)
  private val inputActions: BroadcastChannel<R> = BroadcastChannel(Channel.BUFFERED)

  init {
    var job: Job? = scope.launch {
      val lastState = StateWrapper(initialState)
      val flowList = sideEffects.map { sideEffect ->
        sideEffect(inputActions.asFlow(), { lastState.state })
      }.run {
        toMutableList().apply {
          add(inputActions.asFlow())
        }
      }
      flowList.onBindMerge {
          isInitialized.value = true
        }
        .onEach { kprint("result $it") }
        .onCompletion { inputActions.cancel() }
        .scan(lastState.state, reducer)
        .collect { outputState ->
          lastState.state = outputState
          viewState.send(outputState)
        }
    }
  }

  fun dispatchAction(action: R) = scope.launch {
    kprint("Received input action $action")
    while (!isInitialized.value) {
      yield()
    }
    inputActions.send(action)
  }
}

Sharing to give the context that original motivation for all of this was ensuring that no action sent to dispatchAction is sent to the channel until the final collector in the stream is guaranteed to be collecting (and now I realize I want the guarantee that all collectors in the stream are guaranteed to be collecting before the first action is sent to the inputActions channel).

Please let me know if there is a more idiomatic way to accomplish this.

brendanw commented 4 years ago

https://github.com/Kotlin/kotlinx.coroutines/issues/1758

I believe this issue is related.

elizarov commented 4 years ago

Thanks for explaining your use-case.

I'm trying to understand what you are trying to achieve here and I do not understand why do you need to wait while the stream is collecting? Cannot you simply replace your BroadcastChannel with a plain Channel and then send to this channel from your dispatchAction function, being totally sure that it will be consumed on the other side?

brendanw commented 4 years ago

Rendezvous channel is what I started with, but I switched to BroadcastChannel so I could use asFlow for multicasting. I took another stab at using a plain Channel and arrived at this:

fun <T> Channel<T>.multicast(scope: CoroutineScope): BroadcastChannel<T> {
  val channel = this
  return scope.broadcast {
    for (x in channel) {
      send(x)
    }
  }.also {
    it.invokeOnClose { channel.cancel() }
  }
}

class StateMachine<R : Any, T>(
  val scope: CoroutineScope,
  private val initialState: T,
  private val sideEffects: List<(Flow<R>, () -> T) -> Flow<R>>,
  private val reducer: suspend (accumulator: T, value: R) -> T
) {
  val viewState: FlowRelay<T> = FlowRelay()
  private val inputActions: Channel<R> = Channel()

  init {
    scope.launch {
      val scope = this
      val lastState = StateWrapper(initialState)
      val multicaster = inputActions.multicast(scope)
      val flowList = sideEffects.map { sideEffect ->
        sideEffect(multicaster.asFlow(), { lastState.state })
      }.run {
        toMutableList().apply {
          add(multicaster.asFlow())
        }
      }
      flowList.merge()
        .onEach { println("result $it") }
        .scan(lastState.state, reducer)
        .distinctUntilChanged()
        .collect { outputState ->
          lastState.state = outputState
          viewState.send(outputState)
        }
    }
  }

  fun dispatchAction(action: R) = scope.launch {
    println("Received input action $action")
    inputActions.send(action)
  }
}

This works on the jvm. It feels more idiomatic. I get an InvalidMutabilityException on iOS unlike the onBindMerge implementation:

Caused by: kotlin.native.concurrent.InvalidMutabilityException: mutation attempt of frozen com.basebeta.utility.mvi.$multicast$lambda-0COROUTINE$33@ad2628
        at 0   BaseBetaCommonApi                   0x000000010a230637 kfun:kotlin.Throwable.<init>(kotlin.String?)kotlin.Throwable + 87
        at 1   BaseBetaCommonApi                   0x000000010a2297e5 kfun:kotlin.Exception.<init>(kotlin.String?)kotlin.Exception + 85
        at 2   BaseBetaCommonApi                   0x000000010a229325 kfun:kotlin.RuntimeException.<init>(kotlin.String?)kotlin.RuntimeException + 85
        at 3   BaseBetaCommonApi                   0x000000010a25e015 kfun:kotlin.native.concurrent.InvalidMutabilityException.<init>(kotlin.String)kotlin.native.concurrent.InvalidMutabilityException + 85
        at 4   BaseBetaCommonApi                   0x000000010a25f7e8 ThrowInvalidMutabilityException + 680
        at 5   BaseBetaCommonApi                   0x000000010a7d2758 MutationCheck + 104
        at 6   BaseBetaCommonApi                   0x000000010a251260 kfun:kotlin.coroutines.native.internal.ContinuationImpl.<set-intercepted>#internal + 96
        at 7   BaseBetaCommonApi                   0x000000010a25186c kfun:kotlin.coroutines.native.internal.ContinuationImpl.releaseIntercepted() + 428
        at 8   BaseBetaCommonApi                   0x000000010a252022 kfun:kotlin.coroutines.native.internal.BaseContinuationImpl.resumeWith(kotlin.Result<kotlin.Any?>) + 1234
        at 9   BaseBetaCommonApi                   0x000000010a3c89ec kfun:kotlinx.coroutines.DispatchedTask.run() + 2780
        at 10  BaseBetaCommonApi                   0x000000010a66ab65 kfun:com.basebeta.utility.dispatchers.UI.dispatch$lambda-0#internal + 85
        at 11  BaseBetaCommonApi                   0x000000010a66adab kfun:com.basebeta.utility.dispatchers.UI.$dispatch$lambda-0$FUNCTION_REFERENCE$27.invoke#internal + 59
        at 12  BaseBetaCommonApi                   0x000000010a66ae0b kfun:com.basebeta.utility.dispatchers.UI.$dispatch$lambda-0$FUNCTION_REFERENCE$27.$<bridge-UNN>invoke()#internal + 59

My best guess is the channel is frozen and calling send is the mutation that triggers the exception, but not really certain.

I tried ripping out the relevant code into a standalone kotlin-native example (https://github.com/brendanw/kotlin-native-statemachine). My goal was to reproduce the mutability exception there, but I am not seeing the code execute the same way. I also noticed that the behavior of runBlocking changed between kotlin 1.3.61 and 1.3.70; if I am not mistaken using runBlocking pre-1.3.70 would prevent any other coroutine from running on the main thread but that no longer seems to be the case.

I am open ears for how to best investigate.

elizarov commented 4 years ago

What version of kotlinx.coroutines are you using in Kotlin/Native? Note, that you need a special -native-mt version to be able to work across threads with K/N. See #462 for details.

brendanw commented 4 years ago

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core-native:1.3.3-native-mt")

https://github.com/brendanw/kotlin-native-statemachine/blob/master/build.gradle.kts

It's setup to work with native-mt correctly. My comment about using 1.3.70 sent the wrong signal. I am using that repository to test out 1.3.70 as well, but have that code commented out.

elizarov commented 4 years ago

I have hard time reproducing your problem. Can you, please, commit an actual test to your project?

brendanw commented 4 years ago

I isolated the problem to one file. https://github.com/brendanw/scope-native-error/blob/master/src/macosMain/kotlin/sample/SampleMacos.kt

Reproduction steps:

  1. Run that project as is

expected result: this coroutine runs is printed to output actual result: this coroutine runs is not printed to output

Running with normal 1.3.3 coroutine dependency works fine.

elizarov commented 4 years ago

What you are observing is a difference between Dispatchers.Main in the master branch and in native-mt branches.

You should change your main function like this for native-mt branch:

fun main() {
    ViewModel()
    CFRunLoopRun() // Run Darwin event loop
}

In this case it works as expected.

You can see full details on the changes in Dispatchers.Main in this commit: https://github.com/Kotlin/kotlinx.coroutines/commit/674f84b0308f881d80c1f4069bff4ef521536d43

brendanw commented 4 years ago

Thank you. I was able to isolate the cause of the InvalidMutabilityException in a small kotlin-native sample:

fun main() {
   val channel = Channel<Int>()
   GlobalScope.launch(Dispatchers.Default) {
      broadcast {
         for (x in channel) {
            offer(x)
         }
      }.asFlow()
          .collect { println(it) }
   }
   CFRunLoopRun()
}
qwwdfsad commented 3 years ago

Fixed in 1.3.x-native-mt