Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.84k stars 1.83k forks source link

Suggestion for a potential new Flow's `timeout` extension #4143

Open tinder-cesardiez opened 1 month ago

tinder-cesardiez commented 1 month ago

Considerations

  • Did you check the latest version of the library?

Yes

  • Do you actually need this feature? Maybe restructuring your code would neatly eliminate the problem the feature would be solving.

I'm not sure, I just wanted to bring this up since my teammates found this extension to be useful.

Use Case

Simplify Flow's Timeout extension fallbacks

The Shape of the API

fun <T> Flow<T>.timeout(
    timeout: Duration,
    onTimeout: suspend FlowCollector<T>.(cause: Throwable) -> Unit
): Flow<T> = timeout(timeout).catch { cause ->
    if (cause is TimeoutCancellationException) {
        // Catch the TimeoutCancellationException emitted above.
        // Emit desired item on timeout.
        onTimeout(cause)
    } else {
        // Throw other exceptions.
        throw cause
    }
}

Usage

someFlow().timeout(X.seconds) {
    emitAll(someFallbackFlow())
}

Prior Art (Why - Thought Process)

When I looked at the timeout documentation I thought it was interesting I would need to "copy-paste" the sample of the documentation in order to deal with the fallback the way it's intended, so I thought it would be good to provide an alternative to developers to make this process a tiny bit easier, since otherwise we could run into lots of timeout blocks with very similar catch logic. Ideally we would want to handle exceptions as close as their source as possible, so the code sample made a lot of sense to me, then why not automating that a bit?

If we think the feature request makes sense, I'm happy to go ahead and make a code contribution. I just wanted to make sure this made sense beforehand.

Timeout Related Sources / Issues

tinder-cesardiez commented 1 month ago

@dkhalanskyjb Do you think it'd make sense to make a contribution for this?

fzhinkin commented 1 month ago

@tinder-cesardiez, both core maintainers are on vacation, so it may take a while before the proposal is evaluated.

dkhalanskyjb commented 1 week ago

The exact suggestion has a flaw. Consider this code:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.*
import kotlin.time.Duration.Companion.milliseconds

fun <T> Flow<T>.timeout(
    timeout: Duration,
    onTimeout: suspend FlowCollector<T>.(cause: Throwable) -> Unit
): Flow<T> = timeout(timeout).catch { cause ->
    if (cause is TimeoutCancellationException) {
        // Catch the TimeoutCancellationException emitted above.
        // Emit desired item on timeout.
        onTimeout(cause)
    } else {
        // Throw other exceptions.
        throw cause
    }
}

fun main() = runBlocking {
    flow<Int> {
        awaitCancellation()
    }.timeout(50.milliseconds)
    .timeout(Duration.INFINITE) {
        println("Should be printed an infinity later")
    }.collect {
    }
}

(runnable version: https://pl.kotl.in/_s0UsTsR4)

timeout with the lambda will catch the exception from an unrelated timeout, and the println will be printed. This can be a real concern when you have several independent parameterized timeouts for one flow.

On the other hand, using timeout with catch is completely transparent: timeout throws an exception on a timeout, and catch catches it, obviously.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.*
import kotlin.time.Duration.Companion.milliseconds

fun main() = runBlocking {
    flow<Int> {
        awaitCancellation()
    }.timeout(50.milliseconds)
    .timeout(Duration.INFINITE)
    .catch {
        if (cause is TimeoutCancellationException) {
            println("One of the timeouts fired.")
        } else {
            throw cause
        }
    }.collect {
    }
}

This transparency and predictability is pretty valuable. If we introduce a timeout with a lambda, I'm pretty sure we'd want to avoid this gotcha, but then, we will have to introduce some new logic that's not just a combination of the existing operators.

This is not a huge issue if this new operator is actually useful: it has some learning curve on its own, but it's still a fairly straightforward API. Is it useful? In which scenarios? Looking for this exact pattern, I didn't find many people using it: https://grep.app/search?q=timeout%5C%28%5B%5E%29%5D%2A%5C%29%5Cs%2A%5C.catch%5Cs%2A%7B&regexp=true&case=true&filter[lang][0]=Kotlin