rwf2 / multer

An async parser for multipart/form-data content-type in Rust
MIT License
158 stars 37 forks source link

StreamBuffer can use unlimied memory if the underlying stream is always ready #62

Closed zl-gcharnock closed 5 months ago

zl-gcharnock commented 5 months ago

This is probably quite hard to encounter in the real world with a real network--I hit this while testing with artifical streams made from futures::stream::repeat. I was trying to write a unit test showing that if the network dropped I was properly cleaning up half-written files. Unfortunatly I struggled to get my unit test to produce a half-written file because the entire thing was being buffered in memory.

I think the problem is the following fuction in buffer.rs. If self.stream never returns pending, self.buf.extend_from_slice(&data) will keep being called and more and more data will be buffered up. Possibly breaking out of the loop if the buffer has reached some decent size might be a good idea? Not sure what "decent size" would be defined as here.

I fully understand if you don't agree that this is a problem, I appreciate that this is probably very much a edge case.

    pub fn poll_stream(&mut self, cx: &mut Context<'_>) -> Result<(), crate::Error> {
        if self.eof {
            return Ok(());
        }

        loop {
            match self.stream.as_mut().poll_next(cx) {
                Poll::Ready(Some(Ok(data))) => {
                    self.stream_size_counter += data.len() as u64;

                    if self.stream_size_counter > self.whole_stream_size_limit {
                        return Err(crate::Error::StreamSizeExceeded {
                            limit: self.whole_stream_size_limit,
                        });
                    }

                    self.buf.extend_from_slice(&data)
                }
                Poll::Ready(Some(Err(err))) => return Err(err),
                Poll::Ready(None) => {
                    self.eof = true;
                    return Ok(());
                }
                Poll::Pending => return Ok(()),
            }
        }
    }
SergioBenitez commented 5 months ago

There's a limit check right above the line you're indicating as problematic. Have you set up data limits? They exist to prevent exactly this.

zl-gcharnock commented 5 months ago

That's not quite solving the right problem. In this use case we have authenticated and trusted users who need to upload some pretty huge files that generally do not fit into RAM or even the attached local disk in the case of EC2 instances. It's important that the handler code recieve the incoming bytes in chunks so it can do some IO action on them, for example, writing to network attached storage for sending the bytes to S3. Rust's async Stream abstraction works really well for this use case and was actually a major reason for rewriting this part of the system in Rust.

In practice this usually works, I expect because real networks are noisy and so the inner stream gives a Pending every so often allowing the bytes that have accumulated in self.buf to be forwarded to the IO action so they can be sent somewhere.

I only hit this issue because I tested with a synthetic stream in order to write a unit test to make sure the things are properly cleaned up if the stream cut off unexpectedly. So if the user was 15Gb through a 20Gb file we shouldn't leave 15Gb of junk behind. But no matter how big I made the request body I did not observe any IO actions but I did observe a lot of memory being used, which was pretty confusing.

Anyway, like I said, this is probably a pretty niche issue that I fully understand might not be worth fixing.

For anyone else hits this issue, here is a workaround. If you wrap the body stream in this adapter that inserts a spurious Pending every so often you will start getting chunks.

#[pin_project::pin_project]
struct InsertPendingStream<S> {
    #[pin]
    stream: S,
    since_last_pending: u8,
}

impl<S> Stream for InsertPendingStream<S>
where
    S: Stream,
{
    type Item = S::Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        if *this.since_last_pending > 10 {
            *this.since_last_pending = 0;
            return Poll::Pending;
        }

        match this.stream.poll_next(cx) {
            Poll::Ready(r) => {
                *this.since_last_pending += 1;
                Poll::Ready(r)
            }
            Poll::Pending => {
                *this.since_last_pending = 0;
                Poll::Pending
            }
        }
    }
}
zl-gcharnock commented 5 months ago

I've just realised something else. I was a bit worried that maybe with a very fast network racing a slow disk bytes in self.buf would build up. But that's not actually the case. The network is racing against loop {}, (for a real network) you are copying out what is in the kernel buffer into self.buf and then sending those bytes on their way. At which point it doesn't matter if they go to slow disk, the kernel buffer will fill up again and that should apply back pressure because that's how TCP is supposed to work. So it's almost certainly fine in all realistic cases.

I'll close this issue, maybe that workaround will be useful for anyone else trying to do something weird like I was.