Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

SendChannel.trySendBlocking() hangs thread upon callbackFlow channel close when unbuffered or buffer full #3952

Open bubenheimer opened 9 months ago

bubenheimer commented 9 months ago

Describe the bug

SendChannel.trySendBlocking() hangs the calling thread indefinitely when an unbuffered callbackFlow channel closes. Instead I expect trySendBlocking() to fail with an appropriate channel-related exception in a timely fashion once the channel closes.

Coroutines version 1.7.3

Provide a Reproducer

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

var callback: ((Int) -> Unit)? = null

val thread = Thread {
    repeat(Int.MAX_VALUE) {
        println("Calling $it")
        callback!!(it)
    }
}

val flow = callbackFlow {
    callback = {
        println("Sending $it")
        try {
            trySendBlocking(it).exceptionOrNull()?.let { println(it) }
        } catch (t: Throwable) {
            println("Send fail: $t")
            throw t
        }
        println("Sent")
    }

    thread.start()

    awaitClose {
        GlobalScope.launch {
            repeat(5) {
                println("channel.isClosedForSend: ${channel.isClosedForSend}")
                delay(1_000L)
            }
        }
    }
}

runBlocking {
    val job = launch {
        flow
                .buffer(RENDEZVOUS)
                .transform {
                    emit(it)
                    delay(1_000L)
                }
                .onCompletion { println("Flow collection complete: $it") }
                .collect {
                    println("Received $it")
                }
    }

    println("Delaying")
    delay(5_000L)
    println("Cancelling")
    job.cancel()
    println("Delaying")
    delay(10_000L)
    println("Interrupting")
    thread.interrupt()
    println("Delaying")
    delay(10_000L)
    println("Done")
}

Output below:

Delaying
Calling 0
Sending 0
Sent
Calling 1
Sending 1
Received 0
Received 1
Sent
Calling 2
Sending 2
Received 2
Sent
Calling 3
Sending 3
Received 3
Sent
Calling 4
Sending 4
Received 4
Sent
Calling 5
Sending 5
Cancelling
Delaying
channel.isClosedForSend: false
Flow collection complete: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@77b987f3
channel.isClosedForSend: true
channel.isClosedForSend: true
channel.isClosedForSend: true
channel.isClosedForSend: true
Interrupting
Delaying
Send fail: java.lang.InterruptedException
Done

The output shows that the callback thread hangs after the channel closes until it is forcibly interrupted from another thread.

bubenheimer commented 9 months ago

Same problem with a (filled) default buffered Channel, or runBlocking() + send().

The thread was still hanging after 5 minutes.

I assume the issue occurs whenever the sender blocks due to channel send suspending, followed by the channel being closed.

Channel.cancel() instead of Channel.close() appears to work around the issue.

qwwdfsad commented 8 months ago

This is a by-design behaviour, that might be potentially tricky to figure out behind all the other machinery.

Basically, it boils down to the following example:

runBlocking {                      
    val ch = Channel<Unit>()      
    launch {                       
        println("Before")         
        ch.send(Unit)             
        println("After")          
    }                             

    yield()                       
    println("Closing")            
    ch.close()                    
    println("Closed")             
}                                 

The close contract states:

Immediately after invocation of this function,
[isClosedForSend] starts returning true. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive on the side of [ReceiveChannel] starts returning true only after all previously sent elements
are received.

In fact, we spent quite some time figuring out whether this behaviour should be the default one. These are the key takeaways:

The approach has intrinsic downsides as well though (e.g. the code that sends concurrently and is not properly synchronized with close is inherently racy, sendBlocking might deadlock, there are scenarios when one expects send to fail immediately etc.) but in the current state of the library, this behaviour cannot be changed as it constitutes a major breaking change with vaguely defined impact (on the scale from "nobody notices" to "the end user applications start crashing in runtime").

qwwdfsad commented 8 months ago

Regarding the particular callbackFlow use-case, I'd recommend using the advice from its documentation:

Using [awaitClose] is mandatory in order to prevent memory leaks when the flow collection is cancelled,
otherwise the callback may keep running even when the flow collector is already completed.

In your scenario, the machinery might look like this:

var callback: ((Int) -> Boolean)? = null // isDone

val thread = Thread {
    repeat(Int.MAX_VALUE) {
        if (callback!!(it)) {
            return@Thread
        }
    }
}

val flow = callbackFlow {
    callback = {
        println("Sending $it")
        try {
            trySendBlocking(it).exceptionOrNull()?.let { println(it) }
            println("Sent")
            false
        } catch (e: InterruptedException) {
            Thread.interrupted()
            println("Callback terminated")
            true
        }
        catch (t: Throwable) {
            println("Send fail: $t")
            throw t
        }
    }

    thread.start()

    awaitClose {
        thread.interrupt()
        thread.join()
    }
}

For more sophisticated use-cases we also have runInterruptible function

bubenheimer commented 7 months ago

@qwwdfsad I don't think this issue should be closed. Thank you for digging into the technical implementation details, but I don't think it addresses the usability issue of the API.

Perhaps I have not made the full extent of the problem clear. The current implementation of callbackFlow looks exceedingly dangerous in general, as it is likely to cause intermittent hangs, and it does not carry warnings to this end. A crash here would be great by comparison, but all it does is hang and make the problem invisible, which helps explain why it was not reported before.

The key ingredients necessary to create this problem are callbackFlow() with a buffer of finite size and trySendBlocking(). This is a common combination.

Cancelling the channel instead of closing it seems to not cause this issue. There may be reasons not do this, not sure. Otherwise it would seem appropriate to replace the current callbackFlow() channel.close() approach with something less dangerous and deprecate callbackFlow().

Regarding the Thread.interrupt() workaround: isn't this possible here only because the caller has a great amount of knowledge about the other thread and control over it? This would seem rare when using callbackFlow() - callbackFlow() is more for callbacks from legacy code. I generally use callbackFlow() with Android framework callbacks. Interrupting Android framework threads or handling their interrupts myself seems a bad idea, likely to cause other problems. In many cases I have no control over those threads at all.

I don't see how runInterruptible() would help. The typical use of callbackFlow just passes a value from a legacy callback to a channel.

In my actual use case I use trySendBlocking() and a small buffer to create backpressure, which is why I was able to isolate the problem. In other cases the result would be intermittent, random hangs and developer frustration.

bubenheimer commented 7 months ago

@qwwdfsad one question about another possible workaround, based on your technical analysis: it sounds like it would help to retrieve all remaining elements from the Channel in awaitClose() until I see isClosedForReceive. Would this be guaranteed to unblock the sender? If so, it may be the best workaround in my case.

Edit: I see that I only get a SendChannel in callbackFlow, so this is not possible to do as an API user, and I cannot cancel the channel either.

bubenheimer commented 7 months ago

@qwwdfsad just to be clear, in actual usage I deregister the callback in awaitClose(). This does not typically imply that the underlying framework will attempt to interrupt blocked callback threads, it usually just discontinues future use of the callback.

qwwdfsad commented 7 months ago

Thanks for pursuing this. I'm reopening this and will answer later