Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.77k stars 1.82k forks source link

Flow multiplexing #4103

Open odedniv opened 1 month ago

odedniv commented 1 month ago

Use case

  1. I have a data source that can only be subscribed to once per process (specifically, this one - note that set...Callback rather than add...Callback).
  2. On subscription, I provide a set of things I want from it (specifically [this](https://developer.android.com/reference/kotlin/androidx/health/services/client/data/PassiveListenerConfig#dataTypes()) - note that it's a Set).
  3. I have different usages in my program that conditionally require a specific set of things from the data source (specifically one code that if executed wants [this](https://developer.android.com/reference/kotlin/androidx/health/services/client/data/DataType#HEART_RATE_BPM()), and another that would want [that](https://developer.android.com/reference/kotlin/androidx/health/services/client/data/DataType#STEPS())).
  4. I need a single manager that manages the desire of all usages, calling the data source based on the sum of all requirements, and feeding each usage the data it requested.

The Shape of the API

Example implementation and API can be seen in https://github.com/Kotlin/kotlinx.coroutines/compare/master...odedniv:kotlinx.coroutines:multiplex. Generally:

/**
 * Constructs a [MultiplexFlow].
 *
 * Behavior:
 * * [getAll] is called every time the total keys collected by flows returned by [MultiplexFlow.get] changes (when collection is started or stopped).
 * * [getAll] is called with the total keys of all collected [MultiplexFlow.get] flows.
 * * [MultiplexFlow.get] calls share the data between them, such that [getAll] is not invoked when all the keys provided to [MultiplexFlow.get] are already collected by another [MultiplexFlow.get] caller.
 *   If [replay] is 0, this rule does not apply and [getAll] is re-invoked for every change in collections.
 * * Errors in calls to [getAll] trigger a rollback to the previous keys, and collections of all [MultiplexFlow.get] with one of the new keys will throw that error.
 * * Follow-up [getAll] error, or an error after the [getAll] collection already succeeded, will clear all subscriptions and cause all [MultiplexFlow.get] collections to throw that error.
 * * If the flow returned by [getAll] finishes, all current collections of [MultiplexFlow.get] finish as well, and follow-up collections will re-invoke [getAll].
 */
public fun <K, V> MultiplexFlow(
    scope: CoroutineScope,
    replay: Int = 1,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    getAll: suspend (keys: Set<K>) -> Flow<Map<K, V>>,
): MultiplexFlow<K, V>

/**
 * Allows multiplexing multiple subscriptions to a single [Flow].
 *
 * This is useful when the source allows only a single subscription, but the data is needed by multiple users.
 */
public class MultiplexFlow<K, V> internal constructor(...) {
    /** Returns a [Flow] that emits [V] for the requested [K]s, based on the map provided by `getAll`. */
    public operator fun get(vararg keys: K): Flow<V>
}

//
// Sample usage:
//

val multiplexFlow = MultiplexFlow<Int, String>(scope) { keys: Set<Int> -> // keys of all requests, eventually {1, 2, 3}
  // Collection of this flow will be cancelled when the set of total keys is replaced.
  dataSourceFor(keys).map { values: List<DataValue> -> // values for all requests
    values.associateBy { it.key } // mapping to allow each user to get only the data they requested
  }
}
launch {
  multiplexFlow[1, 2].collect { value -> /* values with keys 1 or 2 */ }
}
launch {
  multiplexFlow[2, 3].collect { value -> /* values with keys 2 or 3 */ }
}

data class DataValue(key: Int, ...)

Prior Art

This is similar to a SharedFlow, except for these distinctions:

  1. It is aware of subscription of specific data requested, rather than a single global requirement.
  2. Each collector only gets the specific data it wants, rather than everything.

Because of these distinctions, there would have to be multiple SharedFlows, which would be hard to get right if there's only a single data source for all of them. Note that the suggested implementation in https://github.com/Kotlin/kotlinx.coroutines/compare/master...odedniv:kotlinx.coroutines:multiplex is actually based on maintaining multiple SharedFlows, each feeding the specific users.

Implementing this is very error prone, around both thread safety and lifecycle (e.g. rollback when a new user requested data that fails to be fetched). It took many cycles in my project to get this right, so I thought it would be a good idea to have an implementation in kotlinx.coroutines.

odedniv commented 1 month ago

Ignore the specific commit referenced above, see https://github.com/Kotlin/kotlinx.coroutines/compare/master...odedniv:kotlinx.coroutines:multiplex for the most up to date proposal.