hyperium / hyper

An HTTP library for Rust
https://hyper.rs
MIT License
14.07k stars 1.55k forks source link

hyper client happens cacel error which should retry #3673

Open xj524598 opened 1 month ago

xj524598 commented 1 month ago

Version List the version(s) of hyper, and any relevant hyper dependency (such as h2 if this is related to HTTP/2). hyper = { version = "1.1.0", features = ["full"] } hyper-util = { version = "0.1.3", features = ["full"] }

Platform The output of uname -a (UNIX), or version and 32 or 64-bit (Windows) linux 64

Description hyper client happens cacel error which should retry

[short summary of the bug]

  let mut connector = HttpConnector::new();
    connector.set_nodelay(true);
    //禁止使用该功能,其会在拿到IPV6和IPV4的时候优先尝试链接IPV6
    connector.set_happy_eyeballs_timeout(None);
    connector.set_connect_timeout(Some(Duration::from_millis(constant::constant_config::CONNECT_TIME_OUT)));
    // connector.set_send_buffer_size(Some(constant::constant_config::BUFFER_SIZE));
    // connector.set_recv_buffer_size(Some(constant::constant_config::BUFFER_SIZE));
    connector.set_reuse_address(true);

    //hyper_util::rt::TokioExecutor::new() 是否会导致上下文切换更多?
    let http_client = Client::builder(TokioExecutor::new())
        .http1_preserve_header_case(true)
        .http1_title_case_headers(false)
        .pool_idle_timeout(Duration::from_millis(constant::constant_config::CLIENT_IDLE_TIME_OUT)) //空闲链接
        // .pool_max_idle_per_host(constant::constant_config::MAX_IDLE_CONNECTIONS_PER_HOST_HTTP_ALL)
        // .executor()
        // .http1_max_buf_size(constant::constant_config::BUFFER_SIZE)
        .http1_ignore_invalid_headers_in_responses(false)
        .http1_allow_spaces_after_header_name_in_responses(false)
        .http1_allow_obsolete_multiline_headers_in_responses(false)
        .timer(TokioTimer::default())
        .pool_timer(TokioTimer::default())
        .build(connector);

    http_client

when use http_client send request,i got some error message as follows:

request::error("message", "Error { kind: SendRequest, source: Some(hyper::Error(Canceled, \"connection closed\")) }"
equest::error("message", "Error { kind: SendRequest, source: Some(hyper::Error(Canceled, \"connection was not ready\")) }")

retry_canceled_requests default is true, i think it should not happen these errors

xj524598 commented 1 month ago

i found the error msg from the drop func,just as follow if Envelope still has req, it means req never send ,should it has the chance to retry?

struct Envelope<T, U>(Option<(T, Callback<T, U>)>);

impl<T, U> Drop for Envelope<T, U> { fn drop(&mut self) { if let Some((val, cb)) = self.0.take() { cb.send(Err(( crate::Error::new_canceled().with("connection closed"), Some(val), ))); } } }

xj524598 commented 1 month ago

hyper client idle timeout is 20s,and server is tomcat,default keep alive timeout is 60s, it should not cause connection close error

darren-fu commented 4 weeks ago

interesting,keep eye on it

meiping05 commented 3 weeks ago
   async fn send_request(
        self,
        mut req: Request<B>,
        pool_key: PoolKey,
    ) -> Result<Response<hyper::body::Incoming>, Error> {
        let mut pooled = self.connection_for(pool_key).await?;

        req.extensions_mut()
            .get_mut::<CaptureConnectionExtension>()
            .map(|conn| conn.set(&pooled.conn_info));

        if pooled.is_http1() {
            if req.version() == Version::HTTP_2 {
                warn!("Connection is HTTP/1, but request requires HTTP/2");
                return Err(e!(UserUnsupportedVersion));
            }

            if self.config.set_host {
                let uri = req.uri().clone();
                req.headers_mut().entry(HOST).or_insert_with(|| {
                    let hostname = uri.host().expect("authority implies host");
                    if let Some(port) = get_non_default_port(&uri) {
                        let s = format!("{}:{}", hostname, port);
                        HeaderValue::from_str(&s)
                    } else {
                        HeaderValue::from_str(hostname)
                    }
                    .expect("uri host is valid header value")
                });
            }

            // CONNECT always sends authority-form, so check it first...
            if req.method() == Method::CONNECT {
                authority_form(req.uri_mut());
            } else if pooled.conn_info.is_proxied {
                absolute_form(req.uri_mut());
            } else {
                origin_form(req.uri_mut());
            }
        } else if req.method() == Method::CONNECT {
            authority_form(req.uri_mut());
        }

        let fut = pooled.send_request(req);
        //.send_request_retryable(req)
        //.map_err(ClientError::map_with_reused(pooled.is_reused()));

        // If the Connector included 'extra' info, add to Response...
        let extra_info = pooled.conn_info.extra.clone();
        let fut = fut.map_ok(move |mut res| {
            if let Some(extra) = extra_info {
                extra.set(res.extensions_mut());
            }
            res
        });

        // As of futures@0.1.21, there is a race condition in the mpsc
        // channel, such that sending when the receiver is closing can
        // result in the message being stuck inside the queue. It won't
        // ever notify until the Sender side is dropped.
        //
        // To counteract this, we must check if our senders 'want' channel
        // has been closed after having tried to send. If so, error out...
        if pooled.is_closed() {
            return fut.await;
        }

        let res = fut.await?;

        // If pooled is HTTP/2, we can toss this reference immediately.
        //
        // when pooled is dropped, it will try to insert back into the
        // pool. To delay that, spawn a future that completes once the
        // sender is ready again.
        //
        // This *should* only be once the related `Connection` has polled
        // for a new request to start.
        //
        // It won't be ready if there is a body to stream.
        if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
            drop(pooled);
        } else if !res.body().is_end_stream() {
            //let (delayed_tx, delayed_rx) = oneshot::channel::<()>();
            //res.body_mut().delayed_eof(delayed_rx);
            let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
                // At this point, `pooled` is dropped, and had a chance
                // to insert into the pool (if conn was idle)
                //drop(delayed_tx);
            });

            self.exec.execute(on_idle);
        } else {
            // There's no body to delay, but the connection isn't
            // ready yet. Only re-insert when it's ready
            let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());

            self.exec.execute(on_idle);
        }

        Ok(res)
    }
        // As of futures@0.1.21, there is a race condition in the mpsc
        // channel, such that sending when the receiver is closing can
        // result in the message being stuck inside the queue. It won't
        // ever notify until the Sender side is dropped.
        //
        // To counteract this, we must check if our senders 'want' channel
        // has been closed after having tried to send. If so, error out...
        if pooled.is_closed() {
            return fut.await;
        }

then

Is mpsc::UnboundedSender dropped causing this problem? Any suggestions for modifications?