hyperium / h2

HTTP 2.0 client & server implementation for Rust.
MIT License
1.34k stars 269 forks source link

Does the memory have residual memory? #648

Closed silence-coding closed 1 year ago

silence-coding commented 1 year ago

Does the memory have residual memory? https://github.com/hyperium/h2/blob/294000c0745c64009151a1ab39978cd6f02dfd68/src/proto/streams/recv.rs#L612

Currently, the count_buf method has been used to find that self.buffer has accumulated data.

    pub fn poll_data(
        &mut self,
        cx: &Context,
        stream: &mut Stream,
    ) -> Poll<Option<Result<Bytes, proto::Error>>> {
        // TODO: Return error when the stream is reset
        match stream.pending_recv.pop_front(&mut self.buffer) {
            Some(Event::Data(payload)) => {
                self.count_buf(); 
                Poll::Ready(Some(Ok(payload)))
            },
    // ignore
}
    fn count_buf(&mut self) {
        if self.time.elapsed().as_secs() < 2 {
            return;
        }
        self.time = Instant::now();
        let mut count = 0;
        let mut body_count = 0;
        for (k, s) in self.buffer.slab.iter() {
            match &s.value {
                Event::Data(payload) => {
                    count += 1;
                    body_count += 1;
                },
                _ => {
                    count += 1;
                }
            }
        }
        println!("------------pull {}, {}", count, body_count);
    }

When the client has only one request sent per second, the self.buffer count is still not released. image

Scenarios: client:

use std::time::{Duration, SystemTime};
use hyper::{Body, Client, Request};
use hyper::body::Bytes;
use hyper::client::HttpConnector;
use hyper::header::CONTENT_LENGTH;

#[tokio::main]
async fn main() {
    let client = hyper::client::Client::builder().http2_only(true).build_http();
    for i in 0..20 {
        let client = client.clone();
       tokio::spawn(async move {
           for i in 0..100 {
               send_req(client.clone()).await;
               tokio::time::sleep(Duration::from_millis(50)).await;
           }
           println!("xxx exit");
       });
        tokio::time::sleep(Duration::from_millis(20)).await;
    }
    for i in 0..100 {
        send_req(client.clone()).await;
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
    tokio::time::sleep(Duration::from_secs(100)).await;
}

async fn send_req(client: Client<HttpConnector>) {
    let (mut sender, body) = Body::channel();
    let num = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis() % 1000;
    tokio::spawn(async move {
        for i in 0..num {
            sender.send_data("ss".into()).await;
        }
        sender.send_data(Bytes::new()).await;
    });
    let request = Request::builder().uri("http://127.0.0.1:8081")
        .header(CONTENT_LENGTH, (2 * num).to_string()).body(body).unwrap();
    let response = client.request(request).await.unwrap();
    if !response.status().is_success() {
        println!("{:?}", response);
    }
}

server:

pub async fn handle(mut req: Request<Body>) -> Result<Response<Body>, Error> {
    // println!("xxx {:?}", req);
    if req.version() == Version::HTTP_11 {
        hyper::body::to_bytes(req.into_body()).await;
        return Ok(Response::new(Body::from("sss")));
    }
    let client = hyper::client::Client::new();
    *req.version_mut() = Version::HTTP_11;
    let result = client.request(req).await;
    if result.is_err() {
        println!("{:?}", result);
    }
    let mut response = Response::new(Body::from("sss"));
    response
        .headers_mut()
        .insert("content-type", "application/json".parse().unwrap());
    hyper::Result::Ok(response)
}

For the crate of bytes, even if there is only one empty Bytes reference, the corresponding shared memory is not released. image

silence-coding commented 1 year ago

After stream.remove, if an empty frame is stuffed into the buffer, it will not be removed later. https://github.com/hyperium/h2/blob/294000c0745c64009151a1ab39978cd6f02dfd68/src/proto/streams/counts.rs#L165-L167 https://github.com/hyperium/h2/blob/294000c0745c64009151a1ab39978cd6f02dfd68/src/proto/streams/recv.rs#L612

This is because stream.pending_recv.push_back(&mut self.buffer, event); is executed first, and then streams are removed from transition_after. https://github.com/hyperium/h2/blob/294000c0745c64009151a1ab39978cd6f02dfd68/src/proto/streams/counts.rs#L119-L132

Can we solve this problem as simple as this?

        if stream.is_released() {
            return  Ok(());
        }
        // Push the frame onto the recv buffer
        stream.pending_recv.push_back(&mut self.buffer, event);
seanmonstar commented 1 year ago

Thank you for investigating so thoroughly!

Can we solve this problem as simple as this?

Possibly. If you try that change, does the test suite still pass? Does your example show buffer count cleaning up?

silence-coding commented 1 year ago

is_released

In some scenarios, is_released is not true because the write-back response requires a stream.

silence-coding commented 1 year ago

The best way would be to check whether there is a residual pending_recv at stream.remove(), but this change is too much. I chose another way: discard data when no RecvStream exists. image

maxbear1988 commented 1 year ago

@seanmonstar cloud you release a new version to fix this issue?

seanmonstar commented 1 year ago

v0.3.16 is out today :)

zhangweibin222 commented 1 year ago

v0.3.16 is out today :)

Which version does the memory leak problem come from? Does h2-0.2.4 Have this memory leak problem?