Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.99k stars 1.85k forks source link

Feature: Flow.conflate with custom callback for merging elements #3884

Open kunyavskiy opened 1 year ago

kunyavskiy commented 1 year ago

Use case

I have some data, which is represented as flow of updates. And I need to calculate SateFlow of some derived data, which is essentially can be thought as Map<Int, SomeOtherData>.

Basic implementation would look like the following:

suspend fun compute(scope: CoroutineScope, updates: Flow<DataUpdate>) = updates
    .runningFold(emptyData()) { data, update -> data.merge(update) }
    .map { data -> data.getKeys().associateWith { slowComputation(it, data } }
    .stateIn(scope)

Unfortunately, this code works too slowly. I have two optimizations I want to implement.

  1. As state flow conflates, we can add a conflate call before slow computation to avoid some recomputation.
  2. It's possible to understand what keys should be recomputed because of the update, and it's sometimes quite a small set. So we could have something like.
suspend fun compute(scope: CoroutineScope, updates: Flow<DataUpdate>) = updates
    .runningFold(emptyData() to persistentMapOf<...>()) { (data, oldResult), update -> 
      val newData = data.merge(update)
      val newResult = oldResult.putAll(update.getKeys().associateWith { slowComputation(it, newData) })
      newData to newResult
    }
    .map { it.second }
    .stateIn(scope)

The problem is I see no way of implementing both of them simultaneously, as after implementing the second optimization, I can't drop intermediate recalculations.

The Shape of the API

I think it would be nice to have a function like fun <T> Flow<T>.conflate(merge : (T, T) -> T) with semantics "If a new element comes before the old one is received downstream, they are both replaced with merge(old, new)".

In my case, this would allow merging a bunch of updates that came while slowCompuatation runs in one update, avoiding intermediate recalculations, but at the same time recalculating only data affected by this bunch.

I failed to implement exactly this function based on conflate implementation, as I don't really understand what is ChannelFlowOperator. But I have a prototype of conflate + map based on based on StateFlow

fun <T, R> Flow<T>.conflatedMap(merge: (T, T) -> T, process : (T) -> R) = flow {
   coroutineScope {
     val local = MutableStateFlow<T?>(null)
     launch {
         collect { new ->
             local.update { old -> if (old == null) new else merge(old, new) }
         }
     }
     while (true) {
        val data =local.getAndUpdate { null } ?: local.filterNotNull().first().let { local.getAndUpdate { null } }
        emit(process(data))
     }
   }
}

But it has a lot of issues. For example, it doesn't propagate the end of flow correctly, doesn't correctly handle nullable types, and I'm not sure if works correctly if merge or process will throw. It sounds that implementing it fully correctly would require a deeper understanding of flow invariants than I have.

He-Pin commented 1 year ago

@kunyavskiy Do you mean something like https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/conflate.html ?

kunyavskiy commented 1 year ago

@kunyavskiy Do you mean something like https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/conflate.html ?

Yes, thanks for the nice example of the already existing API in another project.

qwwdfsad commented 1 month ago

See also #3316