Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13k stars 1.85k forks source link

Flow.collectWhile #1087

Open fvasco opened 5 years ago

fvasco commented 5 years ago

takeXxx share a common logic, have you considered collectWhile?

Here a POC

@FlowPreview
public fun <T> Flow<T>.take(count: Int): Flow<T> {
    if (count == 0) return emptyFlow()
    require(count > 0) { "Take count should be non negative, but had $count" }
    return flow {
        var consumed = 0
        collectWhile { value ->
            if (consumed != count) {
                emit(value)
                consumed++
                true
            } else false
        }
    }
}

public suspend fun <T> Flow<T>.collectWhile(action: suspend (value: T) -> Boolean) {
    var cancellationException: CancellationException? = null
    try {
        collect { value ->
            val loop = action(value)
            if (!loop) {
                val exception = CancellationException()
                cancellationException = exception
                throw exception
            }
        }
    } catch (e: CancellationException) {
        if (e !== cancellationException) throw e
    }
}
elizarov commented 5 years ago

Do you see any collectWhile use-cases beyond takeXxx implementation?

JakeWharton commented 5 years ago

Would be used also for #1078 (first() / firstOrNull()) if that is accepted as a valid operator.

fvasco commented 5 years ago

Do you see any collectWhile use-cases beyond takeXxx implementation?

@elizarov your question looks like: "Should we allow break in the for loop?". Frankly I feel to not be the right person to discuss about the structured programming, especially if actually codebase already proved that an exception is a sufficient break replacement.

So I don't see any strongly argument to break a collect, however an explicit operator can only result clearer to the reader than an quick and dirty goto replacement.

firstXxx can be build using take, as @JakeWharton already noted, so this should look a consideration against this proposal. At same time take can be build using takeWhile, instead the current implementation does not do that and breaks the collect in the same way. Probably firstXxx and any further custom operators will do the same.

This proposal defines a clean method to break a collect.

fvasco commented 4 years ago

firstOrNull use case #1796

Zhuinden commented 3 years ago

Apparently collectWhile exists but it's internal

https://github.com/Kotlin/kotlinx.coroutines/blob/3cb61fc44bec51f85abde11f83bc5f556e5e313a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt#L126-L142

Most likely on purpose, of course.

lowasser commented 3 years ago

We've come across a few cases where there is state we want to update which affects downstream operations. The first place we came across is roughly takeUnits, where each element may represent a different number of units -- especially bytes, where a Flow<ByteString>.takeBytes(bytes: Long) needs to measure each byte string. (Also note that the last ByteString may be truncated, not passed through, which is an additional mismatch for takeWhile. transformWhile is a little more sensical, but you still need to wrap it in flow { var bytesEncountered = 0; emitAll(transformWhile { ... }) } which is more intuitive as collectWhile.

and(Flow<Boolean>) is doable with dropWhile { it }.firstOrNull() != null, but is a little more intuitive with collectWhile.

We might also suggest calling this doCollectWhile, as the condition "comes at the end" and is evaluated at the end. For example,

suspend fun Flow<Boolean>.and(): Boolean {
  var soFar = true
  doCollectWhile {
    soFar = it
    it
  }
  return soFar
}
lowasser commented 3 years ago

Update: we're no longer quite as convinced about doCollectWhile as a name, but we have also found this useful on SharedFlow. SharedFlows always need a "while" condition, and when there's any sort of state involved in the while condition, collect is usually just a better match than transformWhile or takeWhile.