Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.03k stars 1.85k forks source link

statefulTransform for flow #2580

Open phiSgr opened 3 years ago

phiSgr commented 3 years ago

Sorry for not being able to provide a real life use-case as I don't get to write Kotlin at work.


In this SO question, OP asks for a distinct method for Flow.

As of opening this issue, the answer given is:

fun <T> Flow<T>.distinct(): Flow<T> = with(mutableSetOf<T>()) {
    this@distinct.filter{ add(it) }
}

It is subtly wrong, as the flow has one shared mutable state, the mutable set, that is reused over multiple collections.


I suggest the following HOF to be added This is inspired by statefulMapConcat from Akka Streams.

fun <T> Flow<T>.statefulTransform(
    transformSupplier: () -> suspend FlowCollector<R>.(value: T) -> Unit
): Flow<T> = flow {
    val transform = transformSupplier()
    collect { value ->
        return@collect transform(value)
    }
}

Then distinct can be written as

fun <T> Flow<T>.distinct(): Flow<T> = statefulTransform<T, T> {
    with(mutableSetOf<T>()) {
        { if (add(it)) emit (it) }
    }
}

BTW I cannot make @BuilderInference work with this.

qwwdfsad commented 3 years ago

That's a great suggestion, we definitely will be looking towards that. It fits perfectly with our idea of orthogonal and composable operators that can be used to express any other transformation, similar to transform, transformWhile and so on.

It seems that not only distinct, but also distinctUntilChanged, scan, runningReduce and others, which is a good sign.

Regarding inference, there is really inconvenient issue that may appear to be the blocker of the whole design: https://youtrack.jetbrains.com/issue/KT-45657

He-Pin commented 2 years ago

@qwwdfsad any update on this?

JakeWharton commented 2 years ago

I'm having a hard time seeing the proposed function signature as being more clear than having users do what the implementation does. As far as I can tell all it does it remove 8 characters from having to call collect yourself.

Compare

fun <T> Flow<T>.distinct(): Flow<T> = statefulTransform<T, T> {
    with(mutableSetOf<T>()) {
        { if (add(it)) emit(it) }
    }
}

with

fun <T> Flow<T>.distinct(): Flow<T> = flow {
    with(mutableSetOf<T>()) {
        collect { if (add(it)) emit(it) }
    }
}

Despite having to type "collect", you still wind up saving characters because "flow" is so much shorter than "statefulTransform" as the root function call.

Moreover, nothing about statefulTransform prevents someone from the original errant pattern of putting the mutableSetOf call outside the transform lambda.

Continuing, the use of with is overly cute and I would discourage it in real code. So the two functions as written more clearly would be

fun <T> Flow<T>.distinct(): Flow<T> = statefulTransform<T, T> {
    val seen = mutableSetOf<T>()
    { if (seen.add(it)) emit(it) }
}

with

fun <T> Flow<T>.distinct(): Flow<T> = flow {
    val seen = mutableSetOf<T>()
    collect { if (seen.add(it)) emit(it) }
}

I would be less stingy on newlines and write

fun <T> Flow<T>.distinct(): Flow<T> = flow {
    val seen = mutableSetOf<T>()
    collect {
        if (seen.add(it)) emit(it)
    }
}

I think this is a perfectly reasonable pattern and is a very readable code snippet.

He-Pin commented 2 years ago

Thanks for sharing, I just added a statefulmap operator which will handle oncomplete. https://github.com/akka/akka/issues/31077

I need to learn me some kotlin

JakeWharton commented 2 years ago

So I guess the one thing that statefulTransform has over using the flow factory is that it's an extension of Flow allowing you to Not Break The Chain™.

In that Akka issue someone writes a sequence of operators something like

(0..10).toFlow()
    .statefulTransform {
        val seen = mutableSetOf<T>()
        { if (seen.add(it)) emit(it) }
    }
    .otherStuff()

To do this with flow today you would have to do something like .apply and then flow which creates double nesting.

(0..10).toFlow()
    .apply {
        flow {
            val seen = mutableSetOf<T>()
            collect { if (seen.add(it)) emit(it) }
        }
    }
    .otherStuff()

So perhaps the issue here is really that we need an extension form of the flow factory that automatically brings its receiver into scope the same way as an extension function would?

I wish the existing transform function was named transformEach (because it's T -> O) so that we could have transform be this function of Flow<T> -> Flow<O>. Maybe we call it flowTransform?

fun <T, O> Flow<T>.flowTransform(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<O> {
  return flow(block)
}

This fixes the double nesting:

(0..10).toFlow()
    .flowTransform {
        val seen = mutableSetOf<T>()
        collect { if (seen.add(it)) emit(it) }
    }
    .otherStuff()

Its one downside is that when writing an actual extension function such as fun <T> Flow<T>.distinct(): Flow<T> both flow and flowTransform will now work and be suggested. But maybe that's okay.

JakeWharton commented 2 years ago

Er, whoops the implementation of that function was just a placeholder I was supposed to replace before sending! The real one needs to be slightly more complicated as the lambda needs to have both the upstream Flow and the downstream FlowCollector in scope.

Maybe a job for the new context receivers eventually?

phiSgr commented 2 years ago

Continuing, the use of with is overly cute and I would discourage it in real code.

Agreed, I was side-stepping the issue of trailing lambdas.

He-Pin commented 1 year ago

@JakeWharton Would you like to take a look at my related PR for this, thanks. @phiSgr @qwwdfsad I have updated the PR,

qwwdfsad commented 1 year ago

Copying my response from the PR, so the discussion can be continued here if necessary:

With stateful transformations in their current form, we are heavily approaching the readability boundary and the limit of curly braces in a single declaration.

E.g. the example from the tests

fun <T> Flow<T>.zipWithIndex(): Flow<Pair<T, Long>> = statefulMap({ 0L }) { index, value ->
    return@statefulMap Pair(index + 1L, Pair(value, index))
}

can be rewritten using already existing mechanisms:

fun <T> Flow<T>.zipWithIndex(): Flow<Pair<T, Long>> = flow {
    var state = 0L
    collect {
        emit(Pair(it, state++))
    }
}

While the proposed version is slightly shorter, it's arguably less readable and not less error-prone: it's still easy to accidentally capture the wrong value. Unnamed lambda-generator of the default value, as well as using pair for the control flow seem to obscure the readability while bringing not that many benefits: it doesn't encapsulate a non-trivial implementation detail, it doesn't help readability much, it introduces one more way to do the same thing, and chaining such operators in place does not help the declarativity of the operators chain. Apart from that, it introduces a new pattern (operator({ state }) { _op_}) into the core library, and this is the territory where we want to be extra safe.

All in all, we are not ready to accept the CR in its current API shape

He-Pin commented 1 year ago

@qwwdfsad yes and no , how can you implement a grouped(2) with 5 elements and not lost data in a simpler shape, plus not lost the last element? With

        val flow = flow {
            emit(1)
            emit(2)
            emit(3)
            emit(4)
            emit(5)
        }.grouped(2).toList()

    private fun <T> Flow<T>.grouped(size: Int): Flow<List<T>> = flow {
        val acc = mutableListOf<T>()

        collect { value ->
            acc.add(value)
            if (acc.size == size) {
                val list = acc.toList()
                acc.clear()
                emit(list)
            }
        }
    }

Which will only gives you List(1,2), List(3,4).

And the shape is not great is because of Kotlin doesn't have (A, B) but just Pair<A,B>

statefulmap is another kind of mapAccumulate.

Wait: https://youtrack.jetbrains.com/issue/KT-10468/Context-receivers-multiple-receivers-on-extension-functions-properties

He-Pin commented 1 year ago

@qwwdfsad When you come up a better sulution, ping me , I would like to know and use it :)