hyperium / hyper

An HTTP library for Rust
https://hyper.rs
MIT License
14.42k stars 1.59k forks source link

Response with large data seems to reset the connection #2893

Open AmbitionXiang opened 2 years ago

AmbitionXiang commented 2 years ago

Hi, there. As I try to get a large amount of data (e.g., 3GB) from the server, the fetching fails. The code is shown as follows.

Client side

Cargo.toml

[package]
name = "http_client_demo"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", features = ["full"]}
hyper = { version = "0.14.19", features = ["client", "server", "http2", "tcp"]}
http = "0.2.8"

src/main.rs

use hyper::{Body, Client, Uri};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

#[tokio::main]
async fn main() -> Result<()> {
    let client = Client::builder().http2_only(true).build_http::<Body>();
    let uri = "http://127.0.0.1:65413/arbitrary".parse::<Uri>().unwrap();

    let res = client.get(uri).await.unwrap();
    let data_bytes = hyper::body::to_bytes(res.into_body()).await;
    if let Ok(bytes) = data_bytes {
        println!("bytes len = {:?}", bytes.to_vec().len());
        Ok(())
    } else {
        println!("error = {:?}", data_bytes.err().unwrap());
        Ok(())
    }
}

Server side

Cargo.toml

[package]
name = "http_server_demo"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", features = ["full"]}
hyper = { version = "0.14.19", features = ["client", "server", "http2", "tcp"]}
http = "0.2.8"
futures = { version = "0.3.4" }
thiserror = "1.0.15"

src/main.rs

use std::net::{Ipv4Addr, SocketAddr, TcpListener};
use std::task::{Context, Poll};
use hyper::{client::Client, server::conn::AddrIncoming, service::Service, Body, Request, Response, Server,
    StatusCode, Uri};
use futures::future;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum ShuffleError {
    #[error("internal fail")]
    InternalError,
}

pub type StdResult<T, E> = std::result::Result<T, E>;

#[tokio::main]
async fn main() {
    let bind_ip = Ipv4Addr::new(127, 0, 0, 1);
    let bind_port = 65413;
    let conn = TcpListener::bind(SocketAddr::from((bind_ip, bind_port))).unwrap();
    Server::from_tcp(conn).unwrap().serve(ShuffleSvcMaker).await.unwrap();
}

type ShuffleServer = Server<AddrIncoming, ShuffleSvcMaker>;

struct ShuffleService;

impl Service<Request<Body>> for ShuffleService {
    type Response = Response<Body>;
    type Error = ShuffleError;
    type Future = future::Ready<StdResult<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<StdResult<(), Self::Error>> {
        Ok(()).into()
    }

    fn call(&mut self, req: Request<Body>) -> Self::Future {
        let body = Body::from(vec![1u8; 3_000_000_000]);
        match Response::builder().status(200).body(body) {
            Ok(rsp) => {
                future::ok(rsp)
            }
            Err(err) => {
                future::err(ShuffleError::InternalError)
            }
        }
    }
}

struct ShuffleSvcMaker;

impl<T> Service<T> for ShuffleSvcMaker {
    type Response = ShuffleService;
    type Error = ShuffleError;
    type Future = future::Ready<StdResult<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<StdResult<(), Self::Error>> {
        Ok(()).into()
    }

    fn call(&mut self, _: T) -> Self::Future {
        future::ok(ShuffleService)
    }
}

And the result seen from the client is

error = hyper::Error(Body, Error { kind: Reset(StreamId(1), CANCEL, Remote) })

How to solve it?

malaire commented 2 years ago

How much RAM you have? That code uses a lot of RAM.

This uses 3 GB RAM:

let data_bytes = hyper::body::to_bytes(res.into_body()).await;

This copies the data and uses another 3 GB RAM for total of 6 GB RAM:

bytes.to_vec().len()

Then at server this uses 3 or 6 GB RAM (I'm not sure if this copies data or not):

let body = Body::from(vec![1u8; 3_000_000_000]);

I'm not sure if that error can result from running out of RAM, but to decrease RAM usage firstly you can use bytes.len() instead of bytes.to_vec().len() to avoid copying. Then using streaming instead of keeping all data in RAM would decrease RAM usage even more.

AmbitionXiang commented 2 years ago

@malaire My server is equipped with 377GB RAM. So it seems not the reason.

sunnuozhou commented 2 years ago

It seems that hyper uses i32 somewhere and overflow triggers the error, as the code doesn't trigger the error until I change Body::from(vec![1u8; 3_000_000_000]) to Body::from(vec![1u8; 2147483648]) or larger

seanmonstar commented 2 years ago

Ah, thanks for reminding me. This is likely because of https://github.com/hyperium/h2/issues/471.