Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

Flow.materialize/dematerialize operators #2092

Open elizarov opened 4 years ago

elizarov commented 4 years ago

This is a stub issue to introduce Flow.materialize and Flow.dematerialize operators that would materilize flow completion (both normal completion and error) and will be primarily designed to integrate with sharing operators (see #2047). The detailed design is TBD.

eduanb commented 1 year ago

My 2c on why this should be given higher priority.

There are real-world use cases for dividing up a Flow using filters and processing them separately. Take, for instance, reading a large log file. You'd like to handle INFO/WARN/DEBUG logs separately so you share the flow and filter individually. However, this code never terminates:

val logsFlow: Flow<LogMessage> = readLogFile()
val logsSharedFlow = logsFlow.shareIn(this, Eagerly)
val infoLogs = logsSharedFlow.filter { it.type == INFO }.count()
val warnLogs = logsSharedFlow.filter { it.type == WARN }.count()

I'm using count here as an example(which could be achieved in other ways), it will be much more complex in practice.

This was possible a while back using broadcastIn, but this functionality has since been removed. See this answer https://stackoverflow.com/questions/57807545/how-to-split-a-kotlin-flow-into-2-flows

So as I see it, you have 4 options at the moment:

As far as I understand, this is what the original shareIn proposal was about https://github.com/Kotlin/kotlinx.coroutines/issues/1261

In terms of materialize/dematerialize, I find it a bad API. Here are my reasons:

I would propose that SharedFlow be materialized by default. Or at least add some parameter to shareIn(), like materialize=true, or create a materializedShareIn() shorthand.

pacher commented 1 year ago

@eduanb This was discussed multiple times and the answer always was (example) that share is the wrong operator for this use cases and that there should be a dedicated replicate operator. But I don't think it is anywhere on the roadmap and will not be implemented any time soon. There is no open issue about it to begin with.

eduanb commented 1 year ago

@pacher Thank you for sharing. After reading more from that post, it seems like replicate is what I need. I originally thought that materialize would solve it. It's unfortunate that it is not looked at at the moment as it is a common use case, especially on the server side.

Side note, a replicate operator reminds me of a wiretap from EIP patterns. That might be another possible solution/naming worth exploring.

elizarov commented 1 year ago

Just in case anyone actually needs materialize/dematerialize (with all the caveats that usually you it means you are looking at your problem from the wrong perspective), they are extremely easy to implement in your own code:

sealed class ValueOrCompletion<out T> {
    data class Value<out T>(val value: T) : ValueOrCompletion<T>()
    data class Completion(val exception: Throwable?) : ValueOrCompletion<Nothing>()
}

fun <T> Flow<T>.materializeCompletion(): Flow<ValueOrCompletion<T>> = flow {
    val result = runCatching {
        collect { it -> emit(ValueOrCompletion.Value(it)) }
    }
    emit(ValueOrCompletion.Completion(result.exceptionOrNull()))
}

fun <T> Flow<ValueOrCompletion<T>>.dematerializeCompletion(): Flow<T> = transformWhile { vc ->
    when(vc) {
        is ValueOrCompletion.Value -> {
            emit(vc.value)
            true
        }
        is ValueOrCompletion.Completion -> {
            vc.exception?.let { throw it }
            false
        }
    }
}