Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.95k stars 1.84k forks source link

Combine several flows declaratively #3775

Open dovchinnikov opened 1 year ago

dovchinnikov commented 1 year ago

Use case

Combine several flows in a more convenient (and possibly more efficient) way.

We have flows which combine dozens of flows. Existing combine allows to combine at most 5 flows in a type safe way, or to combine an array of flows, but it only works if flows have the same type.

The Shape of the API

fun <T> combine(transform: suspend CombineScope.() -> T): Flow<T> = TODO()

interface CombineScope { // : CoroutineScope ?
  /**
   * Starts collection of [this] flow if not yet started. 
   * Suspends until [this] flow emits the first value.
   * @return the latest emitted value
   */
  suspend fun <T> Flow<T>.latestValue(): T

  /**
   * Starts collection of [this] flow if not yet started.
   * @return the latest emitted value or [initialValue] if [this] flow did not emit anything yet
   */
  suspend fun <T> Flow<T>.latestValue(initialValue: T): T
}

fun usage(ints: Flow<Int>, floats: Flow<Float>): Flow<String> = combine {
  val i = ints.latestValue()
  if (i < 0) {
    return@combine "x"
  }
  else {
    val f = floats.latestValue(initialValue = 2f)
    return@combine f.toString() + i.toString()
  }
}

Pros:

  1. No need to choose type unsafe varargs overload.
  2. Allows to have the variable declaration close to the flow: val i = ints.latestValue().
  3. Allows to avoid collecting unnecessary flows: if the first value from ints is -1, then floats will not be even discovered, and will not be collected until ints emits a value > 0. This can be considered a con (see con №1).
  4. Allows to skip running transform unnecessarily: if both ints and floats are discovered and are being collected, and the latest emitted value from ints was < 0, then all emitted values from floats can be ignored until ints emit a new value, because "x" will be the result of transform regardless of f value.

Cons:

  1. In case of
    combine {
    val i = ints.latestValue()
    val f = floats.latestValue() 
    }

    floats will be started collecting after ints emits a value (see pro №3).

Prior Art

https://github.com/Kotlin/kotlinx.coroutines/issues/3598#issuecomment-1405276057 https://github.com/Kotlin/kotlinx.coroutines/issues/3598 https://github.com/cashapp/molecule

dovchinnikov commented 1 year ago

Let's imagine suspend operator fun getValue is supported. The con №1 can be fixed as follows:

interface CombineScope {
  suspend fun <T> Flow<T>.latestValue(): Value<T>
}

interface Value<T> {
  suspend operator fun getValue(thisRef: Nothing?, property: KProperty<*>): T
}

fun usage(ints: Flow<Int>, floats: Flow<Float>): Flow<String> = combine {
  val i by ints.latestValue() // starts collecting ints
  val f by floats.latestValue() // starts collecting floats
  // reads of delegated variables suspend until respective flows emit their first values
  i.toString() + f.toString() 
}
elizarov commented 1 year ago

I don't get why you would need to have val i by ints.latestValue() when val i = ints.latestValue() does the same thing, is shorter and more straightforward.

dovchinnikov commented 1 year ago

Since val i = ints.latestValue() returns a ready-to-use value, it will resume only after ints flow emits the first value. Instead, latestValue can start the collection and return a handle right away (Value), giving the client a chance to start collection of more flows concurrently.

// start collection, return without waiting for the emission
val vi: Value<Int> = ints.latestValue() 
// start another collection
val vf: Value<Float> = floats.latestValue()

But it's inconvenient to use:

val i: Int = vi.getValue() // suspends until the first emission
val f: Float = vf.getValue() // suspends until the first emission
i.toString + f.toString() 

So instead of writing vi.getValue() everywhere, this could (and should) be solved by delegates.


There is almost 1-1 example in the YT:

val i: Int = computeI()
val f: Float = computeF() // called after computeI() resumes

To execute both computations concurrently we can use async:

val di: Deferred<Int> = async { computeI() }
val df: Deferred<Float> = async { computeF() }

But now we have to await to obtain the actual value:

val i: Int = di.await()

Instead, suspending property delegates would allow to write this:

val i: Int by async { computeI() }
val f: Float by async { computeF() }
i.toString() + f.toString() // i and f go through getValue and suspend until `async` is completed