Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.06k stars 1.85k forks source link

Immediate dispatchers can cause spooky action at a distance #3760

Open dkhalanskyjb opened 1 year ago

dkhalanskyjb commented 1 year ago

Dispatchers.Unconfined, Dispatchers.Main.immediate, etc., have an interesting property many people rely on. Namely, when coroutines with those dispatchers are resumed in the right context, the tasks start executing immediately without going through a dispatch. For example, launch(Dispatchers.Unconfined) { flow.collect { ... } } allows one to observe all emissions that happen in a hot flow, they do not get conflated.

We are recommending Dispatchers.Unconfined for that purpose:

People are recommending such dispatchers to each other for that purpose:

This could cause problems. An example is provided below. The issue that prompted this is https://github.com/Kotlin/kotlinx.coroutines/issues/3506, which describes a similar scenario. I could have simplified the example a lot, but I thought that structuring this in a form that real code could take is more illustrative.

There's a library A, which doesn't know about coroutines at all, with the following functions:

private var a = 0
val subscribers = mutableListOf<(Int) -> Unit>()

/**
  * Performs some work.
  * Can only be called from the main thread.
  */
fun doSomething() {
  ensureMainThread()
  // do some work
  var i = 10
  while (i >= 5) {
    i = Random.nextInt(100)
  }
  val original = a
  // exit the function, scheduling the update to the magic number and notifying everyone
  MainThread.schedule {
    if (a != original) return@schedule
    val newValue = original + i
    subscribers.forEach {
      it(newValue)
      check(original == a) { "Some subscriber changed the magic number" }
    }
    a = newValue
  }
}

/**
 * Subscribes to the magic number changes.
 * Guaranteed to invoke the [block] after exach change to the magic number is requested but before the changes take effect.
 * It is forbidden to call [doSomething] from [block], as this could confuse other subscribers.
 *
 * Can only be called from the main thread.
 */
fun subscribe(block: (Int) -> Unit) {
  ensureMainThread()
  subscribers.add(block)
}

/**
 * Access the data.
 * Can only be called when the magic number is less than 100.
 * Can only be called from the main thread.
 * @throws [IllegalStateException] if the magic number is at least 100.
 */
fun access(client: Int): Int {
  ensureMainThread()
  return if (a < 100) a*a else throw IllegalStateException("Access forbidden")
}

private fun ensureMainThread(): Unit {
  if (Thread.currentThread() != MainThread)
    throw IllegalStateException("Can only be called from the main thread")
}

There's a tiny library B that wraps library A:

object MagicNumberWatcher {
  val magicNumberFlow = MutableSharedFlow<Int>()

  init {
    subscribe { magicNumberFlow.emit(it) }
  }
}

We want to write client code that uses libraries B and A correctly, without ever having an access violation.

We have this collection procedure:

MagicNumberWatcher.magicNumberFlow.collect(Dispatchers.Main.immediate) {
  println(access())
  if (it >= 100) {
    return@collect
  }
}

We use Dispatchers.Main.immediate because we want the collection procedure to be entered without any conflation. Otherwise, we can easily miss the moment when the magic number becomes 100. We can't just infer the previous state of the magic number from the current state, because it changes unpredictably.

In some other place, we have

launch(Dispatchers.Main.immediate) {
  doSomething()
}

We use Dispatchers.Main.immediate, because we simply want doSomething to be executed in the main thread, we don't care if a dispatch happens in order to ensure that. Everything works just fine.

At a later point, library A notices: doSomething is already always running in the main thread, and the code that would cause a stack overflow already failed due to the check, there's no need to additionally reschedule a part of the operation. doSomething becomes this:

fun doSomething() {
  ensureMainThread()
  // do some work
  var i = 10
  while (i >= 5) {
    i = Random.nextInt(100)
  }
  // exit the function, scheduling the update to the magic number and notifying everyone
  val original = a
  val newValue = original + i
  subscribers.forEach {
    it(newValue)
    check(original == a) { "Some subscriber changed the magic number" }
  }
  a = newValue
}

The code suddenly becomes incorrect for no apparent reason. We have a bug, but who introduced it?

I concur that the example seems contrived, but I do believe that, in large enough code bases where many people work, this spooky action at a distance could easily happen eventually.

The root of the issue is that, by introducing the concept of the event loop, we added the requirement for every function to disclose whether it ever executes the user-supplied code in the call stack in which it was called, and for the client code to act on that requirement. This is very similar to the stack overflow requirements: user-supplied code shouldn't call operations that call user-supplied code. However, it's more intrusive than that: for a stack overflow to happen, we do need a long chain in the call graph, most typically of the form A -> B -> A -> ..., whereas for immediate dispatchers to trigger an error condition, it's enough to just call immediate-dispatcher-using B from an immediate-dispatcher-using A once. Moreover, at least in single-threaded scenarios, guarding against stack overflow chains can always be done by remembering whether we are executing A already somewhere up the chain and failing in A if so, thus preventing the client code from causing unbounded recursion.

elizarov commented 1 year ago

Great observation. There is a tradeoff between a potential stack overflow and an execution-order-change introduced by queuing. Maybe the stack overflow is actually a lesser evil here.

qwwdfsad commented 1 year ago

Nice write-up, the framing with "two libraries and a user" sets a completely different perspective.

Here is the original discussion and PR (https://github.com/Kotlin/kotlinx.coroutines/issues/381, #425), with some observations:

The problem with lifting SoE-protection is that it, at first glance, is mostly harmless. The most potentially harmful pattern is two communicating coroutines, both launched at immeidate dispatcher, especially if their communication is continuous and depends on users' input (so there is a high chance such problems manifest themselves in production).

From another perspective, maybe we can do both and, instead of pessimistically forming an event loop the moment here is the nesting, execute coroutines in place optimistically until some arbitrary-defined limit (so, shifting our assumptions towards more optimistic) -- we solve originally-reported change of behaviour, still have a proper SoE protection; On the other hand, the problem is still here -- it just requires more unlikely events to happen at once, definitely masking it from any reasonable testing