Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.1k stars 1.86k forks source link

Support for `runInterruptible` in Kotlin/Native (POSIX) targets #3563

Closed kevincianfarini closed 1 year ago

kevincianfarini commented 1 year ago

Use case

I am attempting to create bindings for io_uring in Kotlin/Native which lean on cancellable coroutines. The shape of my API looks roughly like this

interface IOUring {
  public suspend fun someSystemCall()
}

Internally, the implementation of this will acquire the continuation for someSystemCall, prep some system calls, queue them up to the kernel, and then suspend. The system call prep will contain a pointer to a continuation, which when completed by the kernel, will be handed back to user space and resumed.

Polling for completion events in io_uring in this manner has to be offloaded onto a separate thread. I do this by passing in a scope to my implementation which launches a poll job.

public class URing(scope: CoroutineScope) {

  private val job: Job

  init {
      job = scope.launch { poll() }.apply {
          invokeOnCompletion { /* clean up */ }
      }
  }

  private suspend fun poll() { /* omitted */ }
}

The poll function here invokes io_uring_wait_cqe which is a blocking call. Until recently, this looked like the follwing:

suspend fun poll() = withContext(newSingleThreadedContext()) {
  while (isActive) {
    io_uring_wait_cqe(...)
  }
}

There is a subtle bug with the above code. If either the parent scope or the member job get cancelled, and there are no remaining completion queue events to process, the instance of io_uring will get torn down, io_uring_wait_cqe will never produce another result (blocking indefinitely), and the isActive loop condition will never get checked again. This results in program deadlock.

Since runInterruptible is not available on native, I've had to leverage an arbitrary timeout that continuously spins to ensure we avoid deadlock.

suspend fun poll() = withContext(newSingleThreadedContext()) {
  val timeout = /* 100 milliseconds */
  while (isActive) {
    io_uring_wait_cqe_timeout(..., timeout) // blocks for 100ms or until kernel gives us a value.
  }
}

This is undesirable for multiple reasons:

  1. This timeout feels arbitrary and clumsy.
  2. This will needless spin in the background and waste CPU resources.
  3. This will impose a cancellation delay of 100ms on calling code. This is especially evident during tests.

Ideally I could write the following code on Native like I would on the JVM.

suspend fun poll() = runInterruptible(newSingleThreadedContext()) {
  while (true) {
    io_uring_wait_cqe(...)
  }
}

This code would respect the "prompt cancellation guarantee" as quickly as the underlying thread can respond to an interrupt. On POSIX system I would assume this could be achieved by using signals, but am unsure. On mingw I have no concept of what's required to achieve this.

The Shape of the API

This would be exactly the same as the runInterruptible definition for JVM, but on other platforms except JS.

dkhalanskyjb commented 1 year ago

First of all, it really makes me personally glad to see our library being used in such a manner. I think this is a great idea!

Now, to your request. How would runInterruptible help here? In Java, there's a well-established concept of a thread being interrupted, and most blocking operations respect interruption and know to stop blocking when it happens. I haven't dug into io_uring, but from https://man.archlinux.org/man/io_uring_wait_cqe.3.en I don't see any mentions of io_uring_wait_cqe knowing how to be interrupted. Which is unsurprising, given that io_uring_wait_cqe should be a level below POSIX, right?

kevincianfarini commented 1 year ago

@dkhalanskyjb Thank you for the kind words!

Now, to your request. How would runInterruptible help here? In Java, there's a well-established concept of a thread being interrupted, and most blocking operations respect interruption and know to stop blocking when it happens.

For some context, I am using liburing which is a small wrapper around the io_uring kernel API to make things a bit easier. io_uring_wait_cqe delegates to a function which takes a sigmask presumably to allow interruption via signals.

/*
 * Helper for the peek/wait single cqe functions. Exported because of that,
 * but probably shouldn't be used directly in an application.
 */
int __io_uring_get_cqe(struct io_uring *ring,
            struct io_uring_cqe **cqe_ptr, unsigned submit,
            unsigned wait_nr, sigset_t *sigmask);

You can see that implementation here. I've opened a Github discussion on the liburing repo about cancellation via signals here.

To your original point

and most blocking operations respect interruption and know to stop blocking when it happens

This is not as universally implemented in native routines as it is on the JVM. I wonder if that would change the shape of a resulting API at all?

dkhalanskyjb commented 1 year ago

What do you think about such a solution?

import kotlinx.coroutines.*
import kotlin.coroutines.*

var finish = false

fun interruptOperation() {
    finish = true
}

fun blockingOperation() {
    while (!finish) {
        Thread.sleep(100)
    }
}

fun main() {
    runBlocking {
        val pollingThread = newSingleThreadContext("poll")
        pollingThread.use {
            val job = launch {
                poll(pollingThread)
            }
            delay(1000)
            job.cancelAndJoin()
        }
    }
}

// THE IMPORTANT PART IS BELOW
suspend fun poll(pollingThread: CoroutineContext): Nothing = withContext(pollingThread) {
    suspendCancellableCoroutine { cont ->
        cont.invokeOnCancellation {
            interruptOperation()
        }
        while (isActive) {
            blockingOperation()
        }
    }
}

If you manage to convince liburing to provide some mechanism to interrupt that operation, you can detect cancellation and perform the interruption.

kevincianfarini commented 1 year ago

Hm, I don't think that solves the issue unfortunately. The communication between interruptOperation and blockingOperation is still contingent on blocking operation periodically checking if it should finish. As far as I can tell, this solution isn't much different than the one I have now.

suspend fun poll() = withContext(newSingleThreadedContext()) {
  val timeout = /* 100 milliseconds */
  while (isActive) {
    io_uring_wait_cqe_timeout(..., timeout) // blocks for 100ms or until kernel gives us a value.
  }
}

Both solutions require spinning in the background on some interval. I am particularly concerned that the delay incurred by this will make cancellation very slow. Without the usage of signals, I don't know if this avoidable.


Are there larger concerns with leveraging OS signals in Kotlin/Native? I don't know how the Worker API is implemented internally, but I would think that Worker.requestTermination would use signals under the hood on POSIX.

This conversation has so far not mentioned mingw, but I'm also curious if that would complicate possible thread interrupts.

From what I can tell, there's two possible scenarios for runInterruptable in Kotlin/Native.

  1. The block is implemented entirely in Kotlin and would register signal handlers internally to interrupt an operation during cancellation. For example:
suspend fun foo() = runInterruptable {
  while (true) { println("bar") }
}
  1. The block makes calls to external functions like io_uring_wait_cqe. These functions would have to respect signals like SIGINT and SIGKILL or else we still run the risk of deadlock.
dkhalanskyjb commented 1 year ago

The communication between interruptOperation and blockingOperation is still contingent on blocking operation periodically checking if it should finish. 

No-no, that's not what I mean.

Disregard what's written in blockingOperation and interruptOperation, it's just an example of a simple blocking operation. It could be Thread.sleep/ sleepingThread.interrupt() instead, or, in your case, the io_uring operation / the code to send a signal to the thread performing that operation.

kevincianfarini commented 1 year ago

@dkhalanskyjb I've put this together, which I think is implementing what you're talking about, without success.

private suspend fun poll(): Nothing {
    @OptIn(ExperimentalCoroutinesApi::class)
    val workerThread = newSingleThreadContext("io_uring thread")
    withContext(context = CoroutineName("io_uring poll job") + workerThread) {
        suspendCancellableCoroutine<Nothing> { cont ->
            cont.invokeOnCancellation { workerThread.close() }
            memScoped {
                val cqe = allocPointerTo<io_uring_cqe>()
                while (true) { resumeContinuation(cqe) }
            }
        }
    }
}

I can confirm that the workerThread.close() is being invoked, it's just that the underlying worker backing this dispatcher isn't able to interrupt the blocking io_uring_wait_cqe call.

Would providing a proof-of-concept of cancellation here backed by pthread_kill be of any use?

dkhalanskyjb commented 1 year ago

Let's take a step back, as we're clearly speaking past each other.

We're not magicians, and so can not, even in theory, do anything that C code can not do. If io_uring_wait_cqe doesn't recognize the concept of interruptions, then that's it, we can't magically force it to.

What would you do in C code to interrupt a thread running some blocking interruption? Clearly, this should be done from another thread, so we're talking about interprocess communication. This could be shared memory or sending signals, etc.

The thread running io_uring_wait_cqe_timeout would need to somehow react to the interprocess communication. So, io_uring_wait_cqe needs to know how to react. There's just no way around this.

You suggest pthread_kill, but this won't help in this case just yet. There are two kinds of reactions to a signal: "somehow process" and "drop everything and terminate." If there's a signal to which io_uring_wait_cqe reacts by no longer waiting, then, sure, you could send that signal. As I see it, there's no such signal. So, any implementation that you would be able to write would cause the "drop everything and terminate" reaction, like, in the extreme case, SIGKILL. This may be fine for your use case, actually, but not for a general-purpose library. Consider the following code:

try {
  io_uring_wait_cqe(...) // hangs
} finally {
  println("This code must execute unconditionally")
}

If we just kill the thread, who will execute the finally block? Just dropping the code that follows io_uring_wait_cqe is not an option, because that finally block could perform some important resource cleanups.

My suggestion was to, after liburing implements some interprocess communication that allows it to detect external interruptions in this function (because there's no way around this), do something like the following (pseudocode):

suspend fun poll(pollingThread: CoroutineContext): Nothing = withContext(pollingThread) {
    val thread = pthread_self() // the thread that is going to block
    suspendCancellableCoroutine { cont ->
        cont.invokeOnCancellation {
            pthread_kill(thread, SIGUSR1) // or any other signal that io_uring_wait_cqe recognizes as the signal that it must stop
        }
        while (isActive) {
            io_uring_wait_cqe(...)
        }
    }
}

You can probably use this right now with SIGKILL instead of SIGUSR1, and it may work in your case, but in general, killing threads violates the "this code will eventually execute" guarantees that we give.

kevincianfarini commented 1 year ago

After several months of not thinking about this in my io_uring project, I've come to a solution. It doesn't involve thread interrupts, thankfully, so I want to apologize for the initial bother.

I now synchronous peek to see if a task has been completed. If one has I synchronously resume it, otherwise I yield the current coroutine and check again later.

This is a much simpler design that doesn't involve any other thread at all. I'm posting this here hoping someone else will find this useful.

kevincianfarini commented 1 year ago

Hey @dkhalanskyjb I ended up having to go back to the thread route again because busy-polling was a terrible idea :sweat_smile:. I managed to stumble into a working solution. Would you mind looking at it when you get a chance and maybe answer some of the questions I have? I was quite surprised by what worked and what didn't.

I've left a really detailed commit here here https://github.com/kevincianfarini/kotlin-io-uring/pull/2

dkhalanskyjb commented 1 year ago

I don't understand what's going on in that code.

It looks like the main idea behind your solution is that io_uring_wait_cqe does react to a signal with a signal handler set, namely by returning EINTR. So, the code in https://github.com/Kotlin/kotlinx.coroutines/issues/3563#issuecomment-1367383732 should work after these changes:

kevincianfarini commented 1 year ago

I finally came to a solution I am confident in to cooperatively cancel. There's no thread interruption, no POSIX signals, just good old cooperation.

private suspend fun cooperativelyCancel(): Nothing = suspendCancellableCoroutine { cont ->
    cont.invokeOnCancellation {
        // The ring has been canceled. In the event that there's no readily available
        // CQE to be processed in the queue, we prime the ring with a nop. This allows
        // the thread looping over completion events to unblock from io_uring_wait_cqe,
        // process the event, and then check for cancellation. Upon seeing that the scope
        // has been cancelled, we do not enter the loop again and thus gracefully exit.
        submissionQueueEvents.tryReceive().getOrNull()?.run {
            val sqe = getSubmissionQueueEvent()
            io_uring_prep_nop(sqe)
            // Explicitly set the data of this SQE as null. Calling io_uring_cqe_get_data
            // without first setting the data in a SQE results in an undefined return value.
            // In practice this will return a stale pointer to a continuation which has
            // been unpinned from memory. When we try to convert it back into a StableRef
            // the pointer has moved, and thus causes a segfault.
            //
            // Explicitly setting this data as null allows us to properly handle this nop
            // on the CQE process side without attempting to resume a bogus continuation.
            io_uring_sqe_set_data(sqe = sqe, data = null)
        }
        io_uring_submit(ring.ptr)
    }
}

With the help of @ephemient I was shown that normally cooperation happens here by communicating over a file descriptor. Ktor does it here with a pipe. I could have done something similar with io_uring but then it occurred to me that doing this communication over a pipe was no different than just piping a value through the io_uring mechanics and then checking for cancellation.

The above code will launch a coroutine that awaits cancellation, and when cancelled will ensure that a nop value gets submitted to the uring allowing it to check for cancellation and exit gracefully.

I'm going to close this issue because in doing this I learned that interruption should most likely be cooperative, and you are mostly reaching for the wrong tool with OS signals in Kotlin.

The only open question I have regards the following guidance within the docs for invokeOnCancellation:

Note: Implementation of CompletionHandler must be fast, non-blocking, and thread-safe. This handler can be invoked concurrently with the surrounding code. There is no guarantee on the execution context in which the handler will be invoked.

io_uring_submit is a syscall and I don't know if those are considered to be "fast".

dkhalanskyjb commented 1 year ago

The idea behind the "fast" requirement is that coroutines are lightweight. You're expected to have hundreds of them and cancel them in bulk. There is no thread dedicated to just processing cancellations, so an arbitrary thread that does cancellation could have to run all the cancellation handlers. So, if a cancellation handler were to take a long time, this could lead to unpredictable performance. All of this is probably insignificant in this case.