Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.07k stars 1.85k forks source link

Adding combine(...) methods to combine 6-10 flows #3598

Open koncinar opened 1 year ago

koncinar commented 1 year ago

Use case

I'm working on an Android app, specifically on a screen for composing a new post for social media. We're using Jetpack Compose, following the NowInAndroid example. This means we have a bunch of private states and the final UI state is the combination of those, like:

private val config: StateFlow<Config> = getConfigUseCase()
private val text: StateFlow<String> = MutableStateFlow("")
private val scheduledDate: StateFlow<Long?> = MutableStateFlow(null)
...

val state = combine(
  config,
  text,
  scheduledDate,
  ...,
  ::toUiState
)

The screen is fairly complex, there are a lot of local states and we hit the limit of the method

combine(
  flow: Flow<T1>, 
  flow2: Flow<T2>, 
  flow3: Flow<T3>, 
  flow4: Flow<T4>, 
  flow5: Flow<T5>, 
  transform: suspend (T1, T2, T3, T4, T5) -> R
): Flow<R>

that accepts at most 5 elements. We used the provided method

combine(vararg flows: Flow<T>, crossinline transform: suspend (Array<T>) -> R): Flow<R>

but we didn't like manual casting wherever we used it, so we ended up creating methods combine(...)

Eventually we ended up writing methods that support up to 10 elements like this:

fun <T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, R> combine(
  flow: Flow<T1>,
  flow2: Flow<T2>,
  flow3: Flow<T3>,
  flow4: Flow<T4>,
  flow5: Flow<T5>,
  flow6: Flow<T6>,
  flow7: Flow<T7>,
  flow8: Flow<T8>,
  flow9: Flow<T9>,
  flow10: Flow<T10>,
  crossinline transform: suspend (T1, T2, T3, T4, T5, T6, T7, T8, T9, T10) -> R,
): Flow<R> = kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6, flow7, flow8, flow9, flow10) { args: Array<*> ->
  @Suppress("UNCHECKED_CAST")
  transform(
    args[0] as T1,
    args[1] as T2,
    args[2] as T3,
    args[3] as T4,
    args[4] as T5,
    args[5] as T6,
    args[6] as T7,
    args[7] as T8,
    args[8] as T9,
    args[9] as T10,
  )
}

Now the question arose if these should be present in the Coroutines code itself, specifically in the flow.operators.Zip?

If you'd be interested in this, I can add combine methods for 6, 7, 8, 9 and 10 elements. Let me know.

elizarov commented 1 year ago

Assume there is such a combine overload. Now, your code will look like this:

val state = combine(
  config,
  text,
  scheduledDate,
  ...,
  ::toUiState
)

What's wrong with this code? You have to match the order of the first arguments to combine call to the order of parameters to your toUIState function. It is all good if all the types are different, but if there are few String types in there, it creates an ample opportunity to introduce a bug that is very hard to notice.

Would not it be better to create a special DSL for multi-parameter combine like this:

val state = combine { 
    toUiState(
        config.asValue(),
        text.asValue(),
        scheduledDate.asValue(),
        ...    
    )
}

Here, asValue is going to be a special extension defined in the scope of combine { ... } DSL that adds as corresponding flow to the list of the flows being combined and returns its current value.

This way, it will be possible to easily invoke IDE's parameter assistance to check that you are passing appropriate arguments to your toUiState call.

whyoleg commented 1 year ago

Just want to mention, that something like this is already possible out of the box with Compose (not UI, but just runtime), or with molecule. Example from readme:

@Composable
fun ProfilePresenter(
  userFlow: Flow<User>,
  balanceFlow: Flow<Long>,
): ProfileModel {
  val user by userFlow.collectAsState(null)
  val balance by balanceFlow.collectAsState(0L)

  return if (user == null) {
    Loading
  } else {
    Data(user.name, balance)
  }
}

//somewhere in your code
val userFlow = db.users()
val balanceFlow = db.balances()
val models: StateFlow<ProfileModel> = scope.launchMolecule(clock = ContextClock) {
  ProfilePresenter(userFlow, balanceFlow)
}

(It works in similar way for both Flow and StateFlow; ProfilePresenter can be inlined in scope.launchMolecule) This is scalable to as many arguments as needed, and also IMO much more scalable overall and easy to use.

As a side note, I would think, that may be out of the box tight integration with compose runtime (in coroutines or compose-runtime repositories) for such use cases will be better, than inventing one more solution inside coroutines itself.

elizarov commented 1 year ago

Molecule is, indeed, more scalable when you want to write complex code, but, IMO, it is somewhat heavier for simpler cases. What we should definitely look into is for a conceptual alightnment between those APIs, so if you use both, your code is more similar, so that it is easier to switch between two styles. E.g., we can also provide a delegation style that is similar to val user by userFlow.collectAsState(null) in the combine DSL as well as support "default value" (like null here) for combined streams out of the box.

While we are here, it might also be worth having a DLS for a multi-flow merge. E.g. you can have different sources of events and you want to merge them into a single event stream, wrapping those different event types into a common sealed class hierarchy:

val eventFlow = merge { 
    mouseClicksFlow._tbd_name_ { MouseClickEvent(it) }
    keyPressesFlow._tbd_name_ { KeyPressEvent(it) }
    // etc
}
CoreFloDev commented 1 year ago

Hi,

I don't know if a DSL would make more sense but you can already merge an infinite amount of flow. To retake your example it will look like that:

val eventFlow = listOf( 
    mouseClicksFlow._tbd_name_ { MouseClickEvent(it) },
    keyPressesFlow._tbd_name_ { KeyPressEvent(it) },
    // etc
).merge()
RomanMarinov commented 1 year ago

koncinar, please write methods for 6-10 flow

hoc081098 commented 1 year ago

@RomanMarinov check my library https://github.com/hoc081098/FlowExt

Sent from my 2201117TG using FastHub

koncinar commented 1 year ago

@elizarov and @whyoleg are you suggesting using Jetpack Compose's derivedStateOf like this?

private var config by mutableStateOf(Config.EMPTY)
private var text by mutableStateOf("")
private var scheduledDate by mutableStateOf<Long?>(null)
...

val state by derivedStateOf {
  toUiState(
    config = config,
    text = text,
    scheduledDate = scheduledDate,
  )
}

That would work for me as I'm using this for rendering Compose anyways.

But how do I convert a Coroutines' Flow into Compose's State? collectAsState(initial, context) looks like what I need but it's a @Composable function so I can't call it from the ViewModel. For example if you connect to Room DB and you get Flow<Config> through a UseCase like this:

private val configFlow = getConfigUseCase.invoke()
private var config: State<Config> = configFlow.collectAsState(Config.EMPTY) // <- doesn't work

Observing it in the init { ... } method sounds like it's breaking the reactive pattern.

koncinar commented 1 year ago

Also, if toUiState would be a suspend function because it's doing a lot of work, how do I run it on a different thread? derivedStateOf runs on the main thread.

Zhuinden commented 1 year ago

This means we have a bunch of private states and the final UI state is the combination of those, like:

private val config: StateFlow<Config> = getConfigUseCase()
private val text: StateFlow<String> = MutableStateFlow("")
private val scheduledDate: StateFlow<Long?> = MutableStateFlow(null)
...

val state = combine(
  config,
  text,
  scheduledDate,
  ...,
  ::toUiState
)
fun <T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, R> combine(...

Now the question arose if these should be present in the Coroutines code itself, specifically in the flow.operators.Zip?

If you'd be interested in this, I can add combine methods for 6, 7, 8, 9 and 10 elements. Let me know.

I've found this to be a common requirement and popular request, so for JVM/Android platforms, I have written this up to accept 36 flows.

You can see it here: https://github.com/Zhuinden/flow-combinetuple-kt/