Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.08k stars 1.85k forks source link

CancellableContinuation.invokeOnCancellation should accept a suspend callback #4154

Closed odedniv closed 4 months ago

odedniv commented 5 months ago

Use case

I have a Java async API that lets me set a callback and clear it. Cancelling that API means clearing the callback, but the API to clear the callback is also async.

// Java async API
fun setWaitForEventCallback(block: () -> Unit)
fun clearWaitForEventCallback(): ListenableFuture<Unit>

I want to convert this to a suspend function, so I use:

suspend fun waitForEvent(): Unit = suspendCancellableCoroutine { continuation ->
  setWaitForEventCallback { continuation.resume(Unit) }
  continuation.invokeOnCancellation { clearWaitForEventCallback() }
}

However this is incorrect, as I'm not waiting for clearWaitForEventCallback, which means there's nothing allowing the caller to synchronize the cancellation of a previous job with a new call:

val job = launch { waitForEvent() }
...
job.cancel()
waitForEvent() // Race! setWaitForEventCallback() might be processed by the backend before clearWaitForEventCallback().

What I really want to call is clearWaitForEventCallback.await(), which would allow the caller to join the job after cancellation to ensure synchronization:

suspend fun waitForEvent(): Unit = suspendCancellableCoroutine { continuation ->
  setWaitForEventCallback { continuation.resume(Unit) }
  continuation.invokeOnCancellation {
    clearWaitForEventCallback().await() // Compilation error! Not a suspend function.
  }
}

val job = launch { waitForEvent() }
...
job.cancelAndJoin() // Waits for clearWaitForEventCallback()
waitForEvent() // No Race!

The Shape of the API

typealias CompletionHandler = (cause: Throwable?) -> Unit
+ typealias SuspendCompletionHandler = suspend (cause: Throwable?) -> Unit

interface CancellableContinuation { ... }
+ interface SuspendCancellableContinuation : CancellableContinuation {
+   abstract fun invokeOnCancellation(handler: SuspendCompletionHandler)
+ }

inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T
+ inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (SuspendCancellableContinuation<T>) -> Unit): T { ... }

I believe adding overloads this way will not be a breaking API change, and so doesn't require a major version bump.

Prior Art

In a normal suspend function, I would catch the CancellationException and would still be in a coroutine, allowing me to invoke other async functions.

suspend fun waitForEvent() {
  try {
    ...
  } catch (e: CancellationException) {
    callSuspendFunctions()
    throw e
  }
}

This means that in theory, invokeOnCancellation can accept a coroutine context. The workaround to the current design flaw is that I have to do this:

suspend fun waitForEvent() {
  try {
    suspendCancellableCoroutine { continuation ->
      setWaitForEventCallback { continuation.resume(Unit) }
      continuation.invokeOnCancellation { /* do nothing, handled in catch */ }
    }
  } catch (e: CancellationException) {
    clearWaitForEventCallback().await()
    throw e
  }
}

This workaround is not obvious, and so every developer needs to figure something out which can easily arrive at an errored solution (as I have already).

odedniv commented 5 months ago

I now realize the Kotlin cancellation is synchronous, and any suspend functions called during the catch (e: CancellationException) { ... } will not really be executed when the job is cancelled.

Any other solutions to this situation? I can think of 2:

  1. Block the thread on cancellation (seems bad):
suspend fun waitForEvent(): Unit = suspendCancellableCoroutine { continuation ->
  setWaitForEventCallback { continuation.resume(Unit) }
  continuation.invokeOnCancellation {
    runBlocking { clearWaitForEventCallback().await() } // Block the thread that cancels.
  }
}
  1. Have a lock that ensures synchronization with the next call (seems error prone):
val cancellationLock = Mutex(false)
suspend fun waitForEvent() {
  lock.withLock {} // Wait for previous cancellation.
  suspendCancellableCoroutine { continuation ->
    setWaitForEventCallback { continuation.resume(Unit) }
    cancellationLock.lock() // Lock only after setWaitForEventCallback.
    continuation.invokeOnCancellation {
      clearWaitForEventCallback().addListener({ cancellationLock.tryUnlock() }, <Executor not from this Job, which may have been cancelled>)
    }
  }
  cancellationLock.tryUnlock() // Finished successfully.
}

fun Mutex.tryUnlock() {
  try { lock.unlick() }
  catch (e: IllegalStateException) {}
}
odedniv commented 5 months ago

Learning about withContext(NonCancellable), I'd argue invokeOnCancellation should accept a suspend function that is executed with withContext(NonCancellable) as that's the exact use case - we need to allow suspension while also ignoring the cancellation, which would be better than using runBlocking as it wouldn't block the thread on suspensions.

Anyone from the kotlinx.coroutines team have an opinion?

odedniv commented 4 months ago

Maybe the question is - why is flow.onCompletion takes a suspend function even though it's likely a cancelled coroutine, and suspendCancellableCoroutine's invokeOnCancellation and callbackFlow's awaitClose take a non-suspend, requiring the caller to use runBlocking.

I'd say all 3 need to accept suspend functions, which should probably be called inside withContext(NonCancellable) as it's otherwise error prone.

dkhalanskyjb commented 4 months ago

Here's what I came up with:

import kotlinx.coroutines.*

fun setWaitForEventCallback(block: () -> Unit) {
    println("setting the callback")
}

suspend fun clearWaitForEventCallback() {
    println("starting the clear...")
    delay(100)
    println("cleared!")
}

suspend fun CoroutineScope.subscribe() {
    suspendCancellableCoroutine<Unit> { cont ->
        setWaitForEventCallback { cont.resume(Unit) {} }
        cont.invokeOnCancellation {
            launch(start = CoroutineStart.ATOMIC) {
                withContext(NonCancellable) {
                    clearWaitForEventCallback()
                }
            }
        }
    }
}

fun main() {
    runBlocking {
        println("Launching")
        val job = launch {
            subscribe()
        }
        println("Launched. Wait a bit.")
        delay(50)
        println("Now cancelling.")
        job.cancelAndJoin()
        println("Success!")
    }
}

Prints:

Launching
Launched. Wait a bit.
setting the callback
Now cancelling.
starting the clear...
cleared!
Success!

Here's how I arrived at it:

Does it make sense?

I should note that it's very uncommon for unsubscription to be asynchronous. Most cases where I've seen this are just from inexperienced developers making everything async, so this could be just subscription = null; return Future(Unit) inside. Or maybe there is a completely valid reason for this, everything's possible. In any case, this isn't how it usually works, so I think shaping our API around this use case would overcomplicate the 99% scenario.

Do you have any insight as to why this unsubscription is async (whereas subscribing is synchronous for some reason)? Is it a mistake on the library authors' part, or is it driven by some actual need?

I'd argue invokeOnCancellation should accept a suspend function that is executed with withContext(NonCancellable) as that's the exact use case

I'm not sure there is a one-size-fits-all solution as to who should actually execute this suspend function. Sometimes, you must (un)subscribe from a specific thread, different from the one doing the cancellation or executing your coroutine, for example. Manually choosing a coroutine scope and launching an asynchronous process seems more controllable and transparent to me.

why is flow.onCompletion takes a suspend function even though it's likely a cancelled coroutine

With all these SharedFlow instances, it's easy to forget that there are finite flows as well :)

flowOf(1, 2, 3).onCompletion { emit(4) }.collect { println(it) }
odedniv commented 4 months ago

Thanks @dkhalanskyjb!

launch(start = CoroutineStart.ATOMIC) {

I'm confused - which CoroutineScope is used here?

Do you have any insight as to why this unsubscription is async

This is an Android API, and "subscribing" is basically sending a callback over an IPC (sending a Binder), and clearing the subscription is also an IPC for the same reason. I personally think some IPCs in Android should be blocking, but there's a growing group that think it shouldn't and there's no consensus. There's also constraints on blocking some threads in Android, including the main and Binder threads.

whereas subscribing is synchronous for some reason

Subscription is also async, that's the main reason I use suspendCancellableCoroutine.

it's easy to forget that there are finite flows as well

Yeah I didn't forget, it's just that onCompletion specifically handles exactly 2 ending reasons - from the provider and the consumer. It stands to reason that consumer ending is roughly 50% of the cases, and it seems like the API being suspend is easily misunderstood for that case as suspension wouldn't work.

That means to me that either the API should be split to 2 (e.g. onDataEnd vs onCancellation - the latter should not accept suspend callback), or consumer ending in suspendCancellableCoroutine should behave like onCompletion. Note emit(4) will also not work on cancellation (even with withContext(NonCancellable)), which may be another thing to consider. I think a good API wouldn't require me to check things in the playground, but I often have to for these kind of edge cases.

Making that callback run in withContext(NonCancellable) by default is a lower priority as at least the user has a choice to do it and maybe you want developers to be explicit about this, but I believe it will make the API less error prone in the common case.

dkhalanskyjb commented 4 months ago

I'm confused - which CoroutineScope is used here?

The receiver of

suspend fun CoroutineScope.subscribe() {

which is, at that point, the launch:

        val job = launch {

This is an Android API, and "subscribing" is basically sending a callback over an IPC

Ok, noted, thanks!

That means to me that either the API should be split to 2

Is this just for API purity, or do you have actual use cases where these two paths should be distinct? An extra lambda is not free: it's going to add to the learning curve in the cases where it's not needed. All APIs have some cost, we need to have at least some reason to add them.

Also, then, it would have to be three lambdas, actually:

Note emit(4) will also not work on cancellation (even with withContext(NonCancellable))

It will simply rethrow the error the flow was going to throw anyway.

Making that callback run in withContext(NonCancellable) by default is a lower priority as at least the user has a choice to do it and maybe you want developers to be explicit about this, but I believe it will make the API less error prone in the common case.

It's simpler than that, actually: if you're inside withContext(NonCancellable), you can't easily get the cancellation behavior back. It may be a common case where collecting coroutine is cancelled, but it's not the only case: https://pl.kotl.in/W7ibaq6iG You very well may have suspend code that you want to run in onCompletion and cancel it if the coroutine is cancelled, and if we put everything in withContext(NonCancellable) by default, you can't do that. If we don't use withContext(NonCancellable) by default, you can add NonCancellable. So, not doing NonCancellable by default is strictly more flexible.

odedniv commented 4 months ago

The receiver of

Oh I missed that receiver, it seems unconventional to accept a context/scope both by declaring the function as a suspend function and as a CoroutineScope receiver, it's not clear which context is used. Can you just use coroutineScope { ... } to generate a scope from the current context?

Is this just for API purity, or do you have actual use cases where these two paths should be distinct?

It's because we've seen that the 2 cases act very differently, one of them is not truly accepting a suspend function, and any suspension and emission will silently fail (as cancellation errors are eaten up).

To be clear I'm OK with a single API, it just needs to work for both cases. I can also understand that at this point it's not worth it to change, from a developer migration perspective.

onDownstreamError

That could just be a nullable exception provided to onDataEnd - the coroutine isn't cancelled so you can continue suspensions, whereas onCancellation is very different, not supporting both suspension and emission (2 thirds of the callback's contract).

It will simply rethrow the error the flow was going to throw anyway.

Oh, not just in cancellation, also in downstream errors? Looks like you can't continue emitting regardless of which error stopped the flow. Odd...

if we put everything in withContext(NonCancellable) by default, you can't do that

Yeah that's sensible. The only issue then is that with suspendCancellableCoroutine you can't do it unless you're creating a new coroutine scope (unless I was wrong about my first comment), and I feel like since it only handles cancellation it fits that it will use withContext(NonCancellable) by default even if onCompletion doesn't.

Let me know what you think

dkhalanskyjb commented 4 months ago

Can you just use coroutineScope { ... } to generate a scope from the current context?

Sure.

That could just be a nullable exception provided to onDataEnd - the coroutine isn't cancelled so you can continue suspensions, whereas onCancellation is very different, not supporting both suspension and emission (2 thirds of the callback's contract).

This logic breaks once we are adding parallelism to the mix. Imagine that you've entered onDataEnd, and immediately the coroutine gets cancelled in parallel. Now, you have to handle that the suspension and emission are missing anyway, and you don't gain anything from separating this code path.

It's because we've seen that the 2 cases act very differently, one of them is not truly accepting a suspend function, and any suspension and emission will silently fail (as cancellation errors are eaten up).

This can happen on any code path due to concurrent cancellation.

The only issue then is that with suspendCancellableCoroutine you can't do it unless you're creating a new coroutine scope (unless I was wrong about my first comment)

You can use some existing coroutine scope if you want. coroutineScope ensures that the function won't return until the asynchronous cancellation is complete, but you could have other requirements. Some APIs are fire-and-forget, for example, and don't need to be awaited.

odedniv commented 4 months ago

This logic breaks once we are adding parallelism to the mix (and follow-up related mentions).

Yeah that's true. I wonder if there's a semantic difference between cancellation happening before or after onCompletion (etc) started. I feel like it's similar to programs where pressing Ctrl-C once starts graceful shutdown, and pressing it again stops the graceful shutdown - which is expected by both the program and the user (actually a "looping catch" is unexpected and a major issue, preventing apps from being killed and requiring kill -9). In other words - nobody expects cancellation or other failures during the finally block to not stop the finally block in its tracks.

Some APIs are fire-and-forget

Yes of course, that's equivalent to synchronous APIs as you basically assume the action will be done by the method returning. That being said, fire-and-forget that executes asynchronously is very ill advised, as it could cause races with follow-up subscriptions (note that in our use case there's a single global callback, the clear method will clear whichever is currently set).

dkhalanskyjb commented 4 months ago

I wonder if there's a semantic difference between cancellation happening before or after onCompletion (etc) started.

In the kotlinx.coroutines framework, cancellation is registered only once. Later attempts to cancel an already-cancelled coroutine will be ignored. If you need custom behavior for repeated cancellation attempts, this should be implemented explicitly via some kind of message passing.

fire-and-forget that executes asynchronously is very ill advised, as it could cause races with follow-up subscriptions

It can be made to work by assigning unique identifiers to subscriptions, like

fun subscribe(): Subscription
fun unsubscribe(subscription: Subscription): Job // you can wait for this specific subscription to finish if you want, but you don't have to
dkhalanskyjb commented 4 months ago

@odedniv, is your original question answered? I think we've strayed a bit from the topic. It's better to stay on point and open separate issues/discussions if there are further questions: this way, if someone finds this issue later, they will have an easier time reading it.

odedniv commented 4 months ago

I agree, we drifted with the discussion on Flow.onComplete, which was fun but we can focus on this issue.

This issue is not really a question, it was a FR - I think just like the catch (e: CancellationException) { ... } block in a suspend function is still a suspend function (and can suspend with withContext(NonCancellable)), so should the invokeOnCancellation and awaitClose blocks.

A follow-up to this feature request is that since invokeOnCancellation is known to execute while the coroutine is cancelled, it can run in a withContext(NonCancellable) by default, but I don't have enough context to fully understand the repercussions.


To answer your last comment, which is outside the focus of this issue (we can stop talking about it here):

Later attempts to cancel an already-cancelled coroutine will be ignored

Sorry, I misread Imagine that you've entered onDataEnd, and immediately the coroutine gets cancelled in parallel, reading onCancellation instead of onDataEnd. To be honest I don't fully understand the parallelism problem, it feels like onCancelled should be called if there's cancellation during onDataEnd. Again, I'm not proposing to split onComplete, just make invokeOnCancellation suspend to match.

It can be made to work by assigning unique identifiers to subscriptions

Yes, but this is not the Java callback API I'm working with. There are different reasonable ways to implement cancellation APIs, the API I'm using roughly uses:

// "onData" is the actual callback, and there can be exactly one callback subscribed, follow-up calls will REPLACE it.
// "onSet" is just notifying that "setting the callback" finished.
fun setCallback(onSet, onData)
// Clears the one and only callback.
// Similarly, "onClear" is just notifying that "clearing the callback" finished.
fun clearCallback(onClear)

Which isn't great but not unreasonable - I need calls to both set and clear to be serial, i.e. if I call the suspend method that sets the callback, cancel it before it's done, and call it again - it must be set-clear-set and not set-set-clear (which would happen if I'm not waiting for the clear).

dkhalanskyjb commented 4 months ago

This issue is not really a question, it was a FR - I think just like the catch (e: CancellationException) { ... } block in a suspend function is still a suspend function (and can suspend with withContext(NonCancellable)), so should the invokeOnCancellation and awaitClose blocks.

invokeOnCancellation is a low-level, deeply technical thing, with unusual limitations that make it unlike catch blocks. From the documentation:

A catch block is allowed to throw exceptions, but:

The installed handler should not throw any exceptions.

There can be more than one catch block, but:

At most one handler can be installed on a continuation.

You can write any normal code in catch, but:

Note: Implementations of CompletionHandler must be fast, non-blocking, and thread-safe. This handler can be invoked concurrently with the surrounding code. There is no guarantee on the execution context in which the handler will be invoked.

There was never a goal to let people write complex logic in invokeOnCancellation. If you want a catch block, then why not write exactly that?

import kotlinx.coroutines.*

fun setWaitForEventCallback(block: () -> Unit) {
    println("setting the callback")
}

suspend fun clearWaitForEventCallback() {
    println("starting the clear...")
    delay(100)
    println("cleared!")
}

suspend fun subscribe() {
    try {
        suspendCancellableCoroutine<Unit> { cont ->
            setWaitForEventCallback { cont.resume(Unit) {} }
        }
    } catch (e: CancellationException) {
        withContext(NonCancellable) {
            clearWaitForEventCallback()
        }
        throw e
    }
}

fun main() {
    runBlocking {
        println("Launching")
        val job = launch {
            subscribe()
        }
        println("Launched. Wait a bit.")
        delay(50)
        println("Now cancelling.")
        job.cancelAndJoin()
        println("Success!")
    }
}
odedniv commented 4 months ago

invokeOnCancellation is a low-level, deeply technical thing

Not sure what that means as it is part of the public API, that IIUC is supposed to help the developer do the right thing.

If you want a catch block, then why not write exactly that?

Oh that's nice, but then it feels like invokeOnCancellation should be a shortcut for that, and maybe like awaitClose the block of suspendCancellableCoroutine should throw if that method is not called during execution of the block.

Given that it's so easy to implement without invokeOnCancellation, I'm not sure why not make invokeOnCancellation support this.

In other words, I still stand by the feature request, but maybe with your solution (that I don't think would be automatically obvious to developers) the feature request is of a lower priority (to me). Feel free to close this issue if you disagree.

Also note that I recently learned the hard way that catching CancellationException manually is extremely delicate, for example you can't tell if it was thrown because the coroutine was cancelled or because the code block threw that exception (e.g. withTimeout). I ended up removing all catch (CancellationException) from my code because it was just too error prone.

dkhalanskyjb commented 4 months ago

@qwwdfsad, what do you think about marking invokeOnCancellation and invokeOnCompletion as DelicateCoroutinesApi? They have some involved contracts to fulfill and are only useful for low-level code.

qwwdfsad commented 4 months ago

I was sitting on this idea for a while as well.

Filed https://github.com/Kotlin/kotlinx.coroutines/issues/4180