Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.01k stars 1.85k forks source link

Cancellation of flow "primitives" and terminal operators #1460

Closed LouisCAD closed 5 years ago

LouisCAD commented 5 years ago

As of 1.3.0-RC2, the following snippet:

suspend fun main() = coroutineScope {
    cancel()
    println(flow { emit("a"); emit("b") }.toList().joinToString())
}

prints a, b.

This means that the functions emit, collect and toList are not cancellable. While I can understand emit and collect being non cancellable for certain use cases, the fact that toList (and possibly other potentially long waiting operators) doesn't respond to cancellation signals by throwing a CancellationException concerns me. The source flow might be made out of non cancellable code itself (e.g. using suspendCoroutine only, and not the suspendCancellableCoroutine variant), and this might lead to resource leaks that only a program restart would temporarily fix.

Some or all of these functions could be made cancellable by using ensureActive() without incurring an extra dispatch.

I assume there's a design decision for not making emit and collect cancellable themselves. I'd be interested to know them.

qwwdfsad commented 5 years ago

Your snippet prints a, b because the given flow chain is non-suspending at all. It is actually the rough analogue of ArrayList<String>().apply { add("a"); add("b") }. Nothing checks for cancellation here. And adding ensureActive on every possible emit will slow down everything without any significant benefits.

Regarding your concerns about long-running tasks:

... (and possibly other potentially long waiting operators) doesn't respond to cancellation signals by throwing a CancellationException concerns me.

Any waiting operation will suspend. And all suspension points are indeed cancellable, so any long-running operation will be canceled. A lot of useful operators also use channels under the hood and thus also will check for cancellation.

LouisCAD commented 5 years ago

But toList() is suspending. Also, some suspension points might not be cancellable (although they're quite rare).

qwwdfsad commented 5 years ago

It is suspending, but in your example, it never suspends. If you had any kind of real suspension in either original flow or any intermediate operator (or even in collect {} call if it was there), cancellation would be properly triggered.

For example, let's put Flow aside:

coroutineScope {
    cancel()
    val channel = Channel<Int>(1) // <- buffer of size 1
    channel.send(1)
    channel.receive()
    println("Done")
}

This code has 2 suspension points after cancel, still it completes normally. Does it imply that Channel is not cancellable? It does not. There are some code paths that operate without suspensions and we do not check cancellation on such paths for the sake of performance. Usually, such paths are short and fast, so it's generally acceptable. Sometimes though they are not, then ensureActive/isActive should be used.

Yes, if instead of flowOf(1, 2, 3).onEach { delay(1000) }.toList() one will write flowOf(1, 2, 3).onEach { Thread.sleep(1000) }.toList(), cancellation will take 3 seconds to kick in. For rare cases where it is really necessary we can provide a special operator, e.g. fun Flow<T>.checked(): Flow<T> = onEach { ensureActive() }

Also, some suspension points might not be cancellable (although they're quite rare).

Sure, but it does not change my point and I'm too lazy to explicitly mention all potential (very rare) exceptions :)

LouisCAD commented 5 years ago

I think adding a checked() or alike operator would be a good idea, even if almost never used, as it'd hold documentation of the cases where you might want to use it and curious people would easily discover the risks of infinite non cancellable flows (like flow { while (true) nonCancellableFunction() }) through it and have an easy fix.

On Thu, Aug 22, 2019, 18:48 Vsevolod Tolstopyatov notifications@github.com wrote:

It is suspending, but in your example, it never suspends. If you had any kind of real suspension in either original flow or any intermediate operator (or even in collect {} call if it was there), cancellation would be properly triggered.

For example, let's put Flow aside:

coroutineScope { cancel() val channel = Channel(1) // <- buffer of size 1 channel.send(1) channel.receive() println("Done") }

This code has 2 suspension points after cancel, still it completes normally. Does it imply that Channel is not cancellable? It does not. There are some code paths that operate without suspensions and we do not check cancellation on such paths for the sake of performance. Usually, such paths are short and fast, so it's generally acceptable. Sometimes though they are not, then ensureActive/isActive should be used.

Yes, if instead of flowOf(1, 2, 3).onEach { delay(1000) }.toList() one will write flowOf(1, 2, 3).onEach { Thread.sleep(1000) }.toList(), cancellation will take 3 seconds to kick in. For rare cases where it is really necessary we can provide a special operator, e.g. fun Flow.checked(): Flow = onEach { ensureActive() }

Also, some suspension points might not be cancellable (although they're quite rare).

Sure, but it does not change my point and I'm too lazy to explicitly mention all potential exceptions :)

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/Kotlin/kotlinx.coroutines/issues/1460?email_source=notifications&email_token=ABVG6BICIMDPRIUIMOZKUP3QF27MJA5CNFSM4IOUGR52YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD45WHRI#issuecomment-523985861, or mute the thread https://github.com/notifications/unsubscribe-auth/ABVG6BJAG5PHTPW2AYQUSKLQF27MJANCNFSM4IOUGR5Q .

qwwdfsad commented 5 years ago

The operator is easy to implement, but first, let's see if there is any demand on that.