rust-lang / futures-rs

Zero-cost asynchronous programming in Rust
https://rust-lang.github.io/futures-rs/
Apache License 2.0
5.4k stars 626 forks source link

Undoing a Sink::poll_ready #2109

Open jonhoo opened 4 years ago

jonhoo commented 4 years ago

Before we had Sink::poll_ready, senders always had to have an item ready to send before interacting with the Sink. This sometimes led to unnecessary buffering. The most trivial example of this is the current implementation of StreamExt::forward, which still adheres to the "have an item before you access the sink" model. It calls Stream::poll_next, and then calls Sink::poll_ready when it gets an element. Assuming poll_ready returns Poll::Ready, the element is sent, and all is good. The issue arises if Sink::poll_ready returns Poll::Pending. The implementation must now buffer that element somewhere, and re-try Sink::poll_ready on the next call to poll before it attempts to receive another element from the Stream (this is the buffered_item field on Forward). The upside of this approach is that when we poll_ready returns Poll::Ready, we almost immediately call .start_send and "consume" the slot that poll_ready told us was available.

The alternative approach that Sink::poll_ready enabled is one that does not require any buffering, and it can be written out pretty easily:

loop {
    ready!(self.sink.poll_ready());
    if let Some(item) = ready!(self.stream.poll_recv()) {
        self.sink.start_send(item);
    } else {
        break;
    }
}

Here, we never need to buffer, since we only ever accept another item if we know we can send it immediately (I use this technique in tokio-tower for example).

Unfortunately, this example highlights a problem with Sink as it stands: we may now take a long time (potentially forever) between when we get Poll::Ready from poll_ready and when we call start_send. It might be that the client is simply not sending us any more requests. But, the Sink has promised us that we have a slot available to send, which we are still holding up. In the context of something like a bounded channel, this may mean that we are holding up one of the few slots in the channel, even though we will never use it (see #1312 and https://github.com/rust-lang/futures-rs/pull/984#issuecomment-383792953 for related discussion). If we have multiple of these loops (e.g., if you are forwarding multiple streams to clones of the same bounded mpsc::Sender), then you can easily end up in a livelock: all the slots of the bounded channel are held up by forwards for streams that do not have any elements to send, and as a result forward for streams that do have elements to send are unable to forward them since poll_ready returns Poll::Pending for them.

This is something we've also run into in tower, whose Service trait features a similar poll_ready mechanism. What is needed here is some way to "undo" a Sink::poll_ready that returned Poll::Ready. In tower we've had two different proposals come up. The first (https://github.com/tower-rs/tower/issues/408) is to add a disarm_ready method. The idea here is that implementations of poll that discover that they won't use the slot given to them by poll_ready can "disarm"/give up that slot by calling the disarm_ready method. The second (https://github.com/tower-rs/tower/issues/412) is to have poll_ready return some kind of "token" that must then be passed to start_send (tower's equivalent is called call). This makes the API more misuse-resistant (you can't call start_send without calling poll_ready), and also gives us a way to give up a poll_ready slot: just drop the token.

Neither of these proposals are necessarily the right one, but I figured I would start discussion here since this is definitely an issue with Sink (and similar traits that include a poll_ready method), and one that must be fixed before the trait is considered finished.

jonhoo commented 4 years ago

I'll add to this that there is only a small number of impl Sink whose poll_ready actually does some kind of reservation. Off the top of my head: anything semaphore based (lock/rwlock) and anything based on a bounded channel (like mpsc::Sender). I don't know that that observation helps, but figured I'd put it out there.

jonhoo commented 4 years ago

The observation here is essentially that you cannot currently yield after calling poll_ready. You have to effectively treat every call to poll_ready as a possible semaphore acquisition, or face a potential deadlock (unless you have a concrete Sink type that you know this is not the case for). In practice, this means that you pretty much have to do whatever work ultimately gets you an item to send, and then poll_ready + immediately start_send. At which point the benefits of poll_ready are lost, and we may as well just have poll_start_send and dump poll_ready. Or, as argued above, we'll need something akin to disarm.

najamelan commented 4 years ago

I was just looking into the Sink API myself, and I found it rather hard to find prior discussions to why everything is the way it is, and why people don't like it (eg. it's not being nominated for std even though Stream is).

As for poll_ready, the only thing I've been able to find is that @carllerche said it was important to be able to verify that an item can be processed before having to generate it to correctly implement service back pressure.

I haven't been able to find any more justification that this is actually the case. Maybe it is. In any case, the requirement has a considerable API cost which would benefit from proper justification and probably good documentation for implementers.

It also contrasts a bit with the discussion in the PR you linked above, where it seems @carllerche argues that keeping one slot per sender is fine and you should just call poll_ready from poll_flush to make sure back pressure happens. I feel it contrasts because keeping one slot per sender defeats the point of not buffering the items before they can be processed.

I can see a number of issues with the current Sink API. A semantical single operation is split out over two methods: poll_ready and start_send opening up a can of worms:

In practice, this puts constraints on both the implementer and the client, and those aren't really documented very well, nor compiler enforced. And as @jonhoo points out above, even the combinators in the futures lib don't really follow the "spirit" of poll_ready, and the deadlocking problem seems to demotivate adhering to it when generating the item takes some time.

Looking at tokio::sync::mpsc::Sender, it doesn't even implement Sink anymore. However it still has both methods. It alleviates the problem somewhat by using a semaphore to count the available slots, which allows it to reserve just one slot as opposed to a design with a global lock which deadlocks as soon as any one Sender is between poll_ready and start_send, and which does not require senders to buffer a slot like I think futures::mpsc::Sender still does. It seems to strike the best compromise, allowing slot level locking and allowing an intuitive implementation of the buffer bound, since the number of items buffered corresponds exactly to the bound set by the client.

That being said, it still is susceptible to deadlocking I think (haven't tested) when all slot's are reserved by clients that don't send.

Allowing to undo a poll_ready can help relieve some pain here, but only solves part of the problem. Given the lack of a central discussion about the design of the Sink API combined with it's poor popularity, I would love for a proper discussion to take place that helps to determine the best future for this interface. I personally think it's important to have an interface for this concept and would love to see consensus and adoption around it. Stabilizing Stream without Sink feels like stabilizing AsyncRead without AsyncWrite to me.

To move forward, I see the following as essential:

jonhoo commented 4 years ago

One thing we've played around with in tower (which has a very similar API) is to have poll_ready return a token that then has to be passed to start_send (call is the tower equivalent). If the token is dropped, it would free the reservation. Unfortunately, it's basically impossible to get that to work without generic associated types, or by doing nasty pointer tricks or allocation on every poll_ready, as far as I can tell. You can see some of the discussion we had about it on Discord.

In my mind, there are three options:

  1. poll_ready is removed, and start_send is replaced with poll_start_send, which returns the element if the poll fails (its signature will be a little non-poll-y)
  2. poll_ready is kept in its current form, and disarm is added
  3. poll_ready and start_send are modified somehow to require that poll_ready is called first through the type system (something like a token)

I think 3 would be great, but I think it's not feasible without GAT. I think 1 is unfortunate because it means you must have the item you're going to send before seeing whether you can send it (this hinges on @carllerche's argument that this is important, which I'm inclined to believe). That leaves us with 2.

Here's why I think option 2 isn't that bad: it will essentially only be used by people implementing low-level futures interfaces. "High-level" users will likely just be using async fn send, which hides the details from them, whatever they are. Low-level users will have to know to call poll_ready first, and will have to know the implications of that (that you reserve a slot), it's true, but maybe that's okay? Or maybe that's something we can fix with naming? Something like poll_start_send (poll_ready), complete_send (start_send), and abort_send (disarm)?