rust-lang / futures-rs

Zero-cost asynchronous programming in Rust
https://rust-lang.github.io/futures-rs/
Apache License 2.0
5.42k stars 627 forks source link

Documenting cancel safety of `SinkExt::send()` #2754

Open jgallagher opened 1 year ago

jgallagher commented 1 year ago

I think SinkExt::send() is not cancel safe:

  1. When send() is called, the item is moved into a Send (and from there into its inner Feed)
  2. When the Send is polled, it waits until its feed is ready, which in turn waits until the underlying sink is ready.
  3. If the underlying sink is not ready and the Send is dropped before it becomes ready, the item will be dropped with the Send and never sent to the underlying sink.

I tried to confirm this with a custom Sink that is not ready for some period of time and a select! that tries to send() items to it with a timeout: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=2608a62c8695fbcfbb22d1856bfe930a. The first two items (which time out before the sink is ready) are never delivered.

A couple of questions:

sunshowers commented 1 year ago

Going to be using feed in the rest of this comment to reduce the scope of the problem (send is built on top of feed and has all the issues described below).

So there's an apparent contradiction here with @Darksonn's comment at https://users.rust-lang.org/t/is-tokio-codec-framed-cancel-safe/86408/5. But I think there might be slightly different notions of cancel safety being talked about here.

In this case what we're concerned about is:

  1. sink.feed(item) is passed in, in one branch of a select loop.
  2. The Feed future is dropped before being polled, or before the underlying Sink's poll_ready returns Poll::Ready.

Since item is stored in the Feed future and start_send is called here, if the Feed future is dropped before the start_send occurs, the message will certainly be lost.

This means that the most idiomatic way to use sink.feed() will cause messages to be lost.

I think one solution is to define new feed and send methods that take a &mut Option<T>, and the caller is expected to pass in a Some. Then the invariant would be that if the passed in Option becomes None, the item has been fed into the sink.

I'm going to try and implement these methods in a separate crate to give this a shot.

(This is akin to how tokio's AsyncRead::read_buf is cancel-safe.)

hawkw commented 1 year ago

I think one solution is to define new feed and send methods that take a &mut Option<T>, and the caller is expected to pass in a Some. Then the invariant would be that if the passed in Option becomes None, the item has been fed into the sink.

Another potential new API, which might be a bit nicer to use than the &mut Option<T> approach, would be to add something like tokio::sync::mpsc::Sender's reserve() method. Sender::reserve() is an async fn which waits to reserve capacity in the channel (equivalent to calling Sink::poll_ready until it is ready), and returns a Permit which represents the acquired send capacity. Once the Permit is acquired, it can be consumed to synchronously send a value to the channel. Because the Permit::send operation is synchronous, it cannot be cancelled, and if the reserve future is cancelled, that's fine, because the value has not actually been sent yet.

In this API, the user awaits a reserve future and then sends the item into the Sink once capacity is reserved, and the item is never sent if the future is cancelled. This might be a bit more ergonomic API design than one where the user can try to send an &mut Option<Sink::Item>, and must manually check whether the Option is still Some to recover the item if the send operation is cancelled.

For example, using a feed-Option type API, we would write code that looks like this:

// The sink.
let mut sink = /* ... */;
// The item to send.
let item = /* ... */; 

let mut item = Some(item);
futures::select! {
    // where `try_feed` is the new API
    _ = sink.try_feed(&mut item) => { 
         // item sent, do other stuff...
    }

    _ = some_other_future() => {
         // do other stuff...
    }
}

if let Some(item) = item {
    // item is still there, `try_feed` was cancelled! 
   // do something else...
}

On the other hand, with a reserve/Permit type API, we could write code like this, which I think is a bit easier to understand:

// The sink.
let mut sink = /* ... */;
// The item to send.
let item = /* ... */; 

futures::select! {
    Ok(permit) = sink.reserve() => { 
        // we got a permit, send the item!
        permit.feed(item);

        // the item is sending, do other stuff...
        // (we can guarantee that the item has been fed to the sink,
        // since `Permit::feed`  is synchronous and cannot be cancelled).
    }

    _ = some_other_future() => {
         // do other stuff...
    }
}

This also has an interesting side benefit over the Option API: with reserve(), the caller need not even construct the item to be sent until capacity has already been reserved. This may be beneficial in the case where constructing the item is expensive, or if it is a RAII guard that holds some limited resource.


Of course, this API does not handle the case where the poll_flush phase of sending an item to a Sink is cancelled, but the &mut Option<Sink::Item> API doesn't either; we can't provide any way of getting the item back once we've started sending it, so that's kind of out of scope. Cancel safety for the poll_flush phase is less important anyway, since our flush doesn't actually need to be the flush that's responsible for actually delivering the item: if we cancel the flush operation, some subsequent caller will probably flush before another call to poll_ready.

A Permit type could provide both a synchronous Permit::feed (or Permit::start_send?) that just starts sending the item, and an asynchronous Permit::send that sends the item and polls the sink to flush it. In either case, the item would not be lost if the poll_ready phase is cancelled, because acquiring the Permit (calling poll_ready) is a separate future than the flush operation once the permit is sent.

sunshowers commented 1 year ago

@taiki-e I have an implementation working:

pub trait SinkExt<Item>: Sink<Item> {
    fn reserve(&mut self) -> Reserve<'_, Self, Item>
    where
        Self: Unpin,
    { /* ... */ }

    fn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item>
    where
        Self: Unpin,
    { /* ... */ }
}

Reserve and FlushReserve do what they suggest and return a Permit:

#[derive(Debug)]
#[must_use]
pub struct Permit<'a, Si: ?Sized, Item> {
    sink: &'a mut Si,
    _phantom: PhantomData<fn(Item)>,
}

impl<'a, Item, Si: Sink<Item> + Unpin + ?Sized> Permit<'a, Si, Item> {
    pub fn feed(self, item: Item) -> Result<(), Si::Error> {
        Pin::new(self.sink).start_send(item)
    }

    pub fn send(mut self, item: Item) -> Result<Flush<'a, Si, Item>, Si::Error> {
        Pin::new(&mut self.sink).start_send(item)?;
        Ok(self.sink.flush())
    }
}

Would you be willing to accept this in the futures crate? The alternative is that we maintain our own crate that implements this.

sunshowers commented 1 year ago

I've put up https://github.com/oxidecomputer/cancel-safe-futures for this.

sunshowers commented 1 year ago

Just wanted to follow up and say that the reserve pattern, as implemented in cancel-safe-futures has worked wonderfully for our use cases. We'd love for it to be upstreamed but we're not blocked on it.