hyperium / http-body

Asynchronous HTTP body trait
MIT License
129 stars 50 forks source link

Add Body::poll_progress #90

Open sfackler opened 1 year ago

sfackler commented 1 year ago

As described in https://github.com/hyperium/hyper/issues/3121, this allows server implementations to abort body writes even if the client is not reading data.

LucioFranco commented 1 year ago

@sfackler would it be possible to include a timeout example with this? Or maybe add that example in the doc comment I am curious to see how this would be used.

sfackler commented 1 year ago

We could add a TimeoutBody wrapper to http-body-util, though that wold require making tokio an optional dependency. I can update the PR tonight.

LucioFranco commented 1 year ago

Maybe even at least an example so tokio doesn't need to be a public dep.

sfackler commented 1 year ago

Here's a TimeoutBody implementation (untested):

#[pin_project]
pub struct TimeoutBody<B> {
    #[pin]
    inner: B,
    #[pin]
    timer: Sleep,
    timeout: Duration,
    waiting: bool,
}

impl<B> Body for TimeoutBody<B>
where
    B: Body,
{
    type Data = B::Data;
    type Error = TimeoutErro<B::Error>;

    fn poll_frame(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        if let Poll::Ready(o) = self.as_mut().project().inner.poll_frame(cx) {
            *this.waiting = false;
            return Poll::Ready(o.map(|r| r.map_err(TimeoutError::Inner)));
        }

        self.is_healthy(cx)?;
        Poll::Pending
    }

    fn is_healthy(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), Self::Error> {
        let this = self.project();

        if !*this.waiting {
            this.timer.reset(Instant::now() + *this.timeout);
            *this.waiting = true;
        }

        if this.timer.poll(cx).is_ready() {
            return Err(TimeoutError::TimedOut);
        }

        Ok(())
    }

    fn is_end_stream(&self) -> bool {
        self.inner.is_end_stream()
    }

    fn size_hint(&self) -> SizeHint {
        self.inner.size_hint()
    }
}

pub enum TimeoutError<E> {
    Inner(E),
    TimedOut,
}
sfackler commented 1 year ago

@seanmonstar thoughts on this?

sfackler commented 9 months ago

I've updated this to move from poll_healthy to a more general poll_progress, like what boats described for the Stream trait. In addition to the error reporting case I opened this to handle, the same motivations apply here as well.

olix0r commented 6 months ago

We (Linkerd) are interested in moving this proposal forward.

I've been testing this with patched versions of http-body 0.3 and Hyper 0.14 and the results are promising.

I'm using this to implement a middleware that enforces a progress timeout to cancel stuck streams.

What does the process look like for finalizing this proposal? Is there anything I can do to help?

seanmonstar commented 6 months ago

Thanks for trying it out, @olix0r! I'm glad to have at least 2 use cases for something so fundamental. We can move this forward.

In my prep for doing so, I went back and read the previous conversations, and also the poll_progress post. I think the problem and solution that withoutboats describes is similar, but different enough that perhaps we shouldn't use the same name/mechanism. That's because, this feature isn't describing making progress on the body separate from producing a frame. It's rather to propagate cancelation while waiting on backpressure to clear up.

It feels closer to oneshot::Sender::poll_closed(). Should we change the name here to poll_closed() or poll_canceled()?

At the same time, I'm writing up a longer blog post about this feature, I'll share a draft with you soon.

sfackler commented 6 months ago

That seems like a reasonable-enough name to me, though it might be a bit strange to have poll_closed return Result<(), Error>? That's why the original name in the PR was poll_healthy.

seanmonstar commented 6 months ago

Hm, thanks for pointing that out. It made me think through the problem a bit more I at first figured we could just make it poll_closed() -> Poll<()>, like the oneshot sender. But that has some things to work out:

sfackler commented 6 months ago

A default implementation that just returns Poll::Ready(Ok(())) is correct, and equivalent to the current state of the world.

I don't think it would indicate closure, just drive any background IO (like poll_progress does). For convenience, it allows errors to be returned directly, but an error out of poll_healthy or whatever we call it would be equivalent to that error being returned from poll_frame.

sfackler commented 6 months ago

In that sense, it is really pretty similar to Boats's poll_progress, just fallible.

We could make it infallible and force the error to come out of poll_frame but that just seems like it'd make the implementation more annoying for no real benefit.

seanmonstar commented 6 months ago

So you find that it was needed to make background progress, too? I didn't think that was a goal of the method. Just to detect closure while a frame isn't needed, such as by polling a Sleep.

sfackler commented 6 months ago

Polling a sleep is background progress IMO! :)

My initial use case was purely around detecting disconnects/timeouts and things like that, but Boats's post on poll_progress made me feel like there's no reason you couldn't have some other body implementation that wanted to do some background IO. For example, if you're proxying you may want to internally pull data from the upstream connection while the downstream connection is idle.

seanmonstar commented 6 months ago

That's a fair point. Perhaps the naming is fine then. Or at least, worth considering if poll_progress is better than the alternatives.

Now, I think one more remaining question is about return type. I do think a poll method should return Poll<T>. But what does each value mean? Specifically, what's the difference between Ready(Ok(())) and Pending? It does feel like their different. The default will just return Ok(()). And if it were checking a Sleep, I assume it'd return Pending. What should the caller (such as inside hyper) do with that information? Whatever we determine for that should end up documented on the method.

sfackler commented 6 months ago

I think that Ready(Ok(()) would mean "I'm done making progress in the background". If we don't require poll_progress to be fused, the caller would need to remember that and not call it again which seems pretty annoying TBH. Pending means that there's more background progress work to be done later, the same as any other future.

In practice, the only thing that callers would actually look for is the Ready(Err) case (see https://github.com/hyperium/hyper/pull/3169).

EDIT: Actually, I think we have to require that it's fused to be able to use it properly.

sfackler commented 6 months ago

Added a few bits of docs on return values, and poll_progress implementations to http-body-util combinators.

olix0r commented 3 months ago

@seanmonstar Are you comfortable with this PR? Is there anything else to consider?

seanmonstar commented 3 months ago

I believe generally yes. I got part way through a write-up to publish as a blog post, to explore the area more and get more feedback, since I think it's a sufficiently interesting change to some fundamental crates. I still plan to finish that up, I just had to pause as I dealt with some other contracting work.