Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.01k stars 1.85k forks source link

Decouple switching logic from switchMap operator and consistent naming #1335

Closed elizarov closed 5 years ago

elizarov commented 5 years ago

We currently have a switchMap { ... } operator that is similar to the corresponding swithMap operator from Rx but from the standpoint of Kotlin coroutines looks like an amalgam of different operations that might need to be decoupled. The most basic operation seems to be this one, tentatively named as switchTransform:

fun <T, R> Flow<T>.switchTransform(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>

It would be similar to Flow.transform but with a difference that transform waits for transformation to complete for each element while switchTransfrom would cancel the ongoing transformation on new incoming element and start it again.

Now, note that transform operator can be used to implement the host of basic operators:

filter(predicate) = transform { if (predicate(it)) emit(it) }
map(block) = transform { emit(block(it)) }
flattenConcat() = transform { emitAll(it) }
flatMapConcat(block) = map(block).flattenConcat() = transform  { emitAll(block(it)) } 
onEach(block) = transform { block(it); emit(it) }
collect(block) = transform { block(it) }.collect()

So, if we have a basic switchTransform operator, then it would be fitting to potentially have (at least reserve the names) the whole family of switchXxx operators that are similar to the above above when you replace transform with switchTransform in their implementations.

Moreover, there is a use case (see #1269) for the collectLatest terminal operator that should be called switchCollect under this proposed nomenclature, because:

swithCollect(block) = switchTransform { block(it) }.collect()

However, this switchXxx nomenclature has the following problem.

Under the proposed switchXxx nomenclature switchMap operator shall have the same signature as map (operates on Flow<T>) and should be equivalent to switchTransform { emit(block(it)) }. But we already have experimental switchMap operator that operates on Flow<Flow<T>> and does switchTransform { emitAll(block(it)) } which under the new nomenclature should have been called switchFlattenMapConcat. It is not clear how to proceed.

LouisCAD commented 5 years ago

To me, "Latest" as a suffix is more meaningful than "switch" as a prefix, particularly regarding the cancellation behavior. I'd still like to have switchMap as an alias to flatMapConcatLatest for the conciseness and the less-mouthful spelling.

That's only my take, I'm interested to know what you and others think about switch prefix and Latest suffix, or if there are other naming alternatives.

zach-klippenstein commented 5 years ago

switchTransform (or whatever it ends up getting called) feels like a really powerful operator, the FlowCollector API is so simple and powerful it is great to work with directly.

I agree with @LouisCAD, the switch term seems to confuse most people I've seen learning Rx. As prior art, Reactive Swift has also replaced it with latest. They also have taken a parameterized approach - their flattening operators take a FlattenStrategy parameter that controls merge/concat/latest. This is nice because you only have to learn one set of operator names. Documentation is here: https://github.com/ReactiveCocoa/ReactiveSwift/blob/master/Documentation/BasicOperators.md#flattening-event-streams

I think the names switchFlattenMapConcat and flatMapConcatLatest are also more confusing - "concat" is only meaningful when the streams aren't truncated. If the streams are truncated, as the "latest" implies, there's no different between concatenation and merging because concurrent emissions are prevented by definition.

elizarov commented 5 years ago

Ok. So here is a take on the xxxLatest nomenclature. We can have transformLatest that cancels ongoing transformation and the following operators that are potentially derived from it:

mapLatest(block) = transformLatest { emit(block(it)) }
flattenLatest() = transformLatest { emitAll(it) } 
flatMapLatest(block) = map(block).flattenLatest() = transformLatest  { emitAll(block(it)) } 
collectLatest(block) = transformLatest { block(it) }.collect()

This would also mean that we deprecate switchMap and rename it to flatMapLatest.

This nomenclature produces the following distinctly named variants of flatMap operator with different merging strategies:

Indeed, I like the suffix nomenclature more. I don't like introducing a strategy enum, though. It only makes subsequent dead-code elimination in Android via R8, in Kotiln/JS, and in Kotlin/Native harder.

P.S. There is might be some confusion with combineLatest operator that also ends with latest suffix but is otherwise completely unrelated to the abovexxxLatest family. Having thought about it a bit I don't immediately see it as a big issue, even though it still bugs me a little.

LouisCAD commented 5 years ago

I'm wondering if combineLatest shouldn't be renamed to combineLast as it doesn't cancel the combining lambda, and later, be complemented by a combineLatest, or mergeLatest operator that cancels the merging/combining lambda if a new value comes by.

zach-klippenstein commented 5 years ago

I would interpret combineLast to mean it waits until all the flows complete, then combines the last values emitted before completion.

This is a general issue that affects all operators that take suspend functions isn't it? Every such operator could have a cancelling and non-cancelling variant. A more scalable solution might be to use a dedicated operator: assume that suspending operators will not cancel by default, and introduce a cancelOnNext() operator that cancels the downstream emit call whenever the upstream flow emits a new value. I think this could solve the transform/flatMap/collect cases too.

The implementation would basically be the same as switchMap's:

fun <T> Flow<T>.cancelOnNext(): Flow<T> = scopedFlow { downstream ->
    var previousFlow: Job? = null
    collect { value ->
        previousFlow?.cancel(ChildCancelledException())
        previousFlow?.join()
        // Undispatched to have better user experience in case of synchronous flows
        previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
            downstream.emit(value)
        }
    }
}

So then transformLatest would be cancelOnNext().transform(block).

elizarov commented 5 years ago

I'm wondering if combineLatest shouldn't be renamed to combineLast

@LouisCAD That is indeed something to ponder about. It did not occur to me that combineLatest can be viewed at in this way.

I planned to have separate design discussion on the extraction of basic functionality that drive both (current) combineLatest and zip operators, that is the core functionality that allows to collect from multiple flows at the same time, and the name for this basic operator that does it. And this is additional consideration into that bucket.

elizarov commented 5 years ago

@zach-klippenstein Terrific. This (tentative) cancelOnNext is the core primitive. So we can have:

flattenLatest() = cancelOnNext().flattenConcat()
// Note: block in swtichMap is cancelled on emission too, so this is the correct impl: 
flatMapLatest(block) = cancelOnNext().flatMapConcat(block)
collectLatest(block) = cancelOnNext().collect(block)

The trick is how to name it. Note, that we don't have the concept of OnNext. On the other hand, it is quite a basic primitive and it looks somewhat similar to conflate:

zach-klippenstein commented 5 years ago

Yes that name was terrible, didn't think too much about it.

Maybe a bit verbose, but conflateByDropping vs conflateByCancelling?

circusmagnus commented 5 years ago

Just "switch"? "switchOver"? Conflate has a concrete meaning for me in coroutines, that is applied to upstream, not "sidestream".