ktemkin / usrs

Universal Serial Rust; a pure-rust library for working with USB devices
Apache License 2.0
103 stars 2 forks source link

Async API design #1

Open kevinmehall opened 1 year ago

kevinmehall commented 1 year ago

Several years ago, I prototyped async support in libusb-rs, and others have been slowly getting a version of it into rusb.

While that patch initially didn't use Future because it was written before Rust had async/await, my experience with helping review other rusb async proposals is that trying to map libusb-like async callback APIs directly to returning a Rust Future representing the completion of a specific transfer is not an optimal API.

The main thing everyone does with libusb's async API is queuing several transfers on the same endpoint to keep the bus busy and maximize throughput. This pattern is kind of awkward to implement on top of an API where each transfer returns its own Future, though. There's a mismatch between having multiple Futures to poll and the fact that they're supposed to complete in FIFO order. The futures APIs are more oriented towards loosely coupled independent tasks running concurrently. This usage actually looks more like a Stream polling for the next item than a Future completing once, but with the extra complexity of explicitly handling both completion and submission. futures::FuturesUnordered gets you part of the way there as a way to wait for any of multiple futures, but that requires an extra dependency and some plumbing of futures to implement this most common pattern.

The other use case for an async API is integrating with an external event loop. With libusb, this is only truly possible on Linux (Windows and macOS need a background thread internal or external to libusb for event handling), and their C API for this is quite error-prone. In my opinion, there's also not a huge motivation for this with USB. Unlike with network servers, userspace USB drivers have a small and bounded number of endpoints and can easily launch a few dedicated threads to service them. It's still possible to integrate with Rust async IO reactors and executors if desired, but may not be the main use case to optimize for.

Cancellation is another thing to think about that it looks like usrs skips so far. Libusb pretends to have an API to cancel an individual transfer, but on Windows and MacOS, this actually cancels all pending transfers on the endpoint because that's what the underlying OS APIs allow (though cancelling all transfers is what you want most of the time anyway). Even where supported, the libusb-style individual cancellation has subtle pitfalls. For instance, you need to be sure to cancel OUT transfers in reverse order to avoid race conditions where some of the later writes make it to the device while previous transfers were cancelled. In Rust Future terms, where drop is cancellation, that means you can't just drop a collection of transfer futures to cancel all of them -- FuturesUnordered doesn't specify drop order, and Vec drops in forward order.

The other issue that kept coming up with Future for USB transfers is the Future API's inability to deal with borrowed data soundly in completion-based IO because it's safe to mem::forget a future without calling its destructor. Your code sidesteps this one by using only owned buffers, and while a lot of the allocation and locking usrs performs looks like it could be avoided, I think some form of owned buffers are the way to go.

I'm really excited about this project because a libusb replacement in Rust is something I've considered doing and started researching multiple times. Glad to see you actually doing it, and hopefully I can help in some way.

kevinmehall commented 1 year ago

As for what the API should be instead, I think the design space should be explored further, but here's one idea, expanded from one of my suggestions on rusb:

impl Device {
    /// Get a Pipe for the specified endpoint
    ///
    /// If a pipe for this endpoint already exists and has not been
    /// dropped, an error is returned.
    fn pipe(&self, endpoint: u8) -> Result<Pipe, ClaimPipeError>,
}

/// Buffer is essentially Vec<u8>, with an allocated capacity, and a portion that may
/// be initialized and safely accessed.
///
/// Could just *be* Vec<u8> but may be better to have a separate type for things like
/// DMA-able memory, an auto-ZLP flag, extra fields for multipacket ISO transfers, etc.
struct Buffer{ ... }

impl Pipe {
    /// Submit a new transfer on the endpoint.
    /// 
    /// For an IN endpoint, the transfer size is set by the *capacity* of
    /// the buffer, and the length and current contents are ignored. The
    /// buffer is returned from a later call to `complete` filled with
    /// the data read from the endpoint.
    ///
    /// For an OUT endpoint, the contents of the buffer are written to
    /// the endpoint.
    pub fn submit(&mut self, buf: Buffer) -> Result<(), TransferError>;

    /// Block waiting for the next pending transfer to complete, and return
    /// its buffer or an error status.
    ///
    /// For an IN endpoint, the returned buffer contains the data
    /// read from the device.
    ///
    /// For an OUT endpoint, the buffer is unmodified, but can be
    /// reused for another transfer.
    pub fn complete(&mut self, timeout: Option<Duration>) -> Result<Buffer, TransferError>;

    /// Get the number of transfers that have been submitted with
    /// `submit` that have not yet been returned from `complete`.
    pub fn pending_transfers(&self) -> usize;

    /// Get the number of transfers that have completed and are
    /// ready to be returned from `complete` without blocking.
    pub fn ready_transfers(&self) -> usize;

    /// Cancel all pending transfers on the endpoint pipe.
    /// TODO: maybe this should be on the `Device` or an object separable from the `Pipe`
    /// so it can be called from another thread, and cause a blocking `complete` call to
    //// immediately return.
    fn cancel_all(&mut self) -> Result<(), TransferError>;
}

fn read_example(device: &Device) -> Result<(), anyhow::Error> {
    let pipe = device.pipe(0x81)?;

    while pipe.pending_transfers() < 8 {
        pipe.submit(Buffer::with_capacity(8192));
    }

    loop {
        let buf = pipe.complete()?;
        process_data(&buf);
        pipe.submit(buf);
    }
}

fn write_example(device: &Device) -> Result<(), anyhow::Error> {
    let pipe = device.pipe(0x02)?;

    loop {
        let mut buf = if pipe.pending_transfers() < 8 {
            Buffer::with_capacity(8192)
        } else {
            pipe.complete()?
        };

        fill_data(&mut buf);

        pipe.submit(buf);
    }
}

One advantage of the nonblocking submission / blocking wait design is it means that you can call into OS-specific event waiting APIs from the blocking .complete() method directly instead of needing a background thread or an integration with an IO reactor to receive events. If there's a need to integrate with the Futures ecosystem, there could be an async version of the complete method returning a Future, or maybe have the Pipe itself implement Stream.

ktemkin commented 1 year ago

Hi! @Qyriad and I very much appreciate the input on this -- the async API is definitely something that's tough to get right. <3

I'm going to read over this carefully as soon as I get a chance -- just wanted to let you know it might be a couple of days before I have the bandwidth to get to this, since I'm in the middle of a $work hardware design project. :)

Thanks!

ktemkin commented 1 year ago

Apologies for the wall of text -- I wound up sitting with @Qyriad for a while, and we wound up with quite a lot of thoughts captured.

The main thing everyone does with libusb's async API is queuing several transfers on the same endpoint to keep the bus busy and maximize throughput. This pattern is kind of awkward to implement on top of an API where each transfer returns its own Future, though.

This is definitely a primary use case -- though I don't necessarily see a Future-based API as inherently awkward. One of @Qyriad and my goals, though, is to surface our backend functionality with multiple APIs, so you can choose between your "async paradigm" by enabling the appropriate feature.

There's a mismatch between having multiple Futures to poll and the fact that they're supposed to complete in FIFO order. The futures APIs are more oriented towards loosely coupled independent tasks running concurrently. This usage actually looks more like a Stream polling for the next item than a Future completing once, but with the extra complexity of explicitly handling both completion and submission. futures::FuturesUnordered gets you part of the way there as a way to wait for any of multiple futures, but that requires an extra dependency and some plumbing of futures to implement this most common pattern.

Futures may not be the ideal solution for a basic atom of USB async, but I'm very much of the opinion that providing a Future-wrapped interface to the end user is useful, if just because other people may already be using an async runtime and their own task evaluator; and I'd rather they not have to write an extra bit of bridging logic. I definitely don't think this should be the only interface-- and I kind of see the future of the library as having multiple feature-gated async APIs:

The main use case that performant USB-async needs to address involves reacting "as quickly as we can" from the userland, in order to pull whatever data we have into/out-of a transfer, and then rapidly resubmit the buffer/URB to the kernel, so a USB "queue" can keep flowing. I can see a few common ways of implementing that:

The last one honestly doesn't seem all that much more compelling than augmenting USBFuture with a non-async wait_for_completion method that just blocks to completion. I do see the benefit of having an API that manages the FIFO ordering for you; but I suspect we can create APIs that help the user to do that without fundamentally moving away from the other models.

I suspect that in a lot of cases, having something like BulkInSPSC and BulkOutMPSC (+ some buffer management) would actually provide a very nice common-case for users who really just want the common case of "I want to stream data to/from this USB device as fast as possible".

Cancellation is another thing to think about that it looks like usrs skips so far. Libusb pretends to have an API to cancel an individual transfer, but on Windows and MacOS, this actually cancels all pending transfers on the endpoint because that's what the underlying OS APIs allow (though cancelling all transfers is what you want most of the time anyway). Even where supported, the libusb-style individual cancellation has subtle pitfalls. For instance, you need to be sure to cancel OUT transfers in reverse order to avoid race conditions where some of the later writes make it to the device while previous transfers were cancelled. In Rust Future terms, where drop is cancellation, that means you can't just drop a collection of transfer futures to cancel all of them -- FuturesUnordered doesn't specify drop order, and Vec drops in forward order.

usrs is currently in a state of "us trying to figure out what APIs should look like" while I dogfood the library in my own work projects. This cancellation problem is one I've considered -- though I'm still between a few ideas of how to implement it. I think I'm in agreement with you that the cancellation shouldn't be something handled by individual futures -- though how exactly it should work is something we'll still need to give some thought.

I don't think this is at odds with a Future-based design -- successful cancellation of a whole pipe can just cause all of the existing futures to return with a TransferCancelled status, similar to how libusb handles issuing cancellation to its callbacks.

The other issue that kept coming up with Future for USB transfers is the Future API's inability to deal with borrowed data soundly in completion-based IO because it's safe to mem::forget a future without calling its destructor. Your code sidesteps this one by using only owned buffers, and while a lot of the allocation and locking usrs performs looks like it could be avoided, I think some form of owned buffers are the way to go.

Mm; I think this is a very good point.

I'd definitely consider switching to something where we take ownership of something like a Vec, and then return it to the user upon completion of the async operation. That simplifies a bunch of things on our side, and is more in line with the expectations of how one uses Rust. :)

The only downside is that we force users into working with a specific data type, instead of being able to accept something that impls AsRef or AsMut, but honestly I've been teetering back on forth about whether the sheer amount of API cruft entails is worth the flexibility, anyway.

(@qyriad points out that even using something like a stock RwLock wouldn't work here -- since you can't unlock an RwLock from a thread other than the one that locked it -- so our current API would need a bunch more work to be viable, anyway.)


All in all, I really appreciate the input -- we're definitely going to keep a bunch of your points in mind as we implement our API. I suspect we'll wind up with something that looks somewhere between "an extension of what we have now" and the API you propose above.