durch / rust-s3

Rust library for interfacing with S3 API compatible services
MIT License
519 stars 198 forks source link

Connection pooling support #347

Closed Noah-Kennedy closed 11 months ago

Noah-Kennedy commented 1 year ago

Currently, every request results in a new TCP connection to the origin, which is very expensive due to the multiple round trips required to establish a connection to the origin.

Normally, a crate like this would store the hyper client used by a given bucket within the bucket object, allowing for a connection pool to be established by hyper to the origin. I assume that surf works similarly. For origins which support h2, this would lead to only one outbound TCP connection being needed for communicating with the origin, keeping latencies minimal.

Depending on the details of the operation and the properties of the network path between the client and the server, this may lead to substantial reductions in operation latency.

This is currently one my major blockers for using this crate, as requests I make via this crate are very slow due to the unnecessary round trips.

Noah-Kennedy commented 1 year ago

Theoretically, it would actually be best to just allow the user to pass a tower::Service<Request> to the bucket in the constructor. This would allow a user to submit any arbitrary HTTP client to the bucket and be able to directly tune all of its parameters (e.g h2 params).

dgrr commented 1 year ago

Why not just using a reqwest::Client per Bucket? If the client needs some specific HTTP/2 parameters (which I don't think) then add it to the bucket configuration. I could write a PR if moving to reqwest is accepted. I don't see why a library like this needs hyper.

Noah-Kennedy commented 1 year ago

Why not just using a reqwest::Client per Bucket? If the client needs some specific HTTP/2 parameters (which I don't think) then add it to the bucket configuration. I could write a PR if moving to reqwest is accepted. I don't see why a library like this needs hyper.

This feels like the easiest approach, I just brought up the other approach because I saw that this crate used to use reqwest but dropped it because it was felt to be too heavyweight.

Noah-Kennedy commented 1 year ago

IMO there's also a reasonable question about whether this crate needs to support anything other than hyper. I can see a potential argument to support the sync clients, but the async-std ecosystem is basically dead at this point and it may not be worth the added complexity and maintenance burden to continue supporting it.

stalkerg commented 1 year ago

Will be easy to add such thing into https://github.com/ZitaneLabs/rust-s3-async

Noah-Kennedy commented 1 year ago

@stalkerg are you affiliated with that fork, or the fork it forks? It would be nice for the one from aalekhpatel07 to not be classed as a fork on GitHub, so that issues can be filed and discussions can happen.

stalkerg commented 1 year ago

@Noah-Kennedy no :( but you can make a new fork and enable the issue tracker. It's an MIT license.

lyokha commented 1 year ago

This patch works for me:

diff --git a/s3/src/bucket.rs b/s3/src/bucket.rs
index eb2c1a1..61dbf6f 100644
--- a/s3/src/bucket.rs
+++ b/s3/src/bucket.rs
@@ -47,6 +47,10 @@ use crate::utils::{error_from_response_data, PutStreamResponse};
 use http::header::HeaderName;
 use http::HeaderMap;

+use hyper::Client;
+use hyper::client::HttpConnector;
+use hyper_tls::HttpsConnector;
+
 pub const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880);

 const DEFAULT_REQUEST_TIMEOUT: Option<Duration> = Some(Duration::from_secs(60));
@@ -89,6 +93,7 @@ pub struct Bucket {
     pub extra_headers: HeaderMap,
     pub extra_query: Query,
     pub request_timeout: Option<Duration>,
+    pub client: Client<HttpsConnector<HttpConnector>>,
     path_style: bool,
     listobjects_v2: bool,
 }
@@ -433,6 +438,7 @@ impl Bucket {
             extra_headers: HeaderMap::new(),
             extra_query: HashMap::new(),
             request_timeout: DEFAULT_REQUEST_TIMEOUT,
+            client: Self::make_client(DEFAULT_REQUEST_TIMEOUT),
             path_style: false,
             listobjects_v2: true,
         })
@@ -457,6 +463,7 @@ impl Bucket {
             extra_headers: HeaderMap::new(),
             extra_query: HashMap::new(),
             request_timeout: DEFAULT_REQUEST_TIMEOUT,
+            client: Self::make_client(DEFAULT_REQUEST_TIMEOUT),
             path_style: false,
             listobjects_v2: true,
         })
@@ -470,6 +477,7 @@ impl Bucket {
             extra_headers: self.extra_headers.clone(),
             extra_query: self.extra_query.clone(),
             request_timeout: self.request_timeout,
+            client: self.client.clone(),
             path_style: true,
             listobjects_v2: self.listobjects_v2,
         }
@@ -483,6 +491,7 @@ impl Bucket {
             extra_headers,
             extra_query: self.extra_query.clone(),
             request_timeout: self.request_timeout,
+            client: self.client.clone(),
             path_style: self.path_style,
             listobjects_v2: self.listobjects_v2,
         }
@@ -496,6 +505,7 @@ impl Bucket {
             extra_headers: self.extra_headers.clone(),
             extra_query,
             request_timeout: self.request_timeout,
+            client: self.client.clone(),
             path_style: self.path_style,
             listobjects_v2: self.listobjects_v2,
         }
@@ -509,6 +519,7 @@ impl Bucket {
             extra_headers: self.extra_headers.clone(),
             extra_query: self.extra_query.clone(),
             request_timeout: Some(request_timeout),
+            client: Self::make_client(Some(request_timeout)),
             path_style: self.path_style,
             listobjects_v2: self.listobjects_v2,
         }
@@ -522,6 +533,7 @@ impl Bucket {
             extra_headers: self.extra_headers.clone(),
             extra_query: self.extra_query.clone(),
             request_timeout: self.request_timeout,
+            client: self.client.clone(),
             path_style: self.path_style,
             listobjects_v2: false,
         }
@@ -2066,6 +2078,7 @@ impl Bucket {
     /// async code may instead await with a timeout.
     pub fn set_request_timeout(&mut self, timeout: Option<Duration>) {
         self.request_timeout = timeout;
+        self.client = Self::make_client(timeout);
     }

     /// Configure bucket to use the older ListObjects API
@@ -2231,6 +2244,13 @@ impl Bucket {
     pub fn request_timeout(&self) -> Option<Duration> {
         self.request_timeout
     }
+
+    fn make_client(request_timeout: Option<Duration>) -> Client<HttpsConnector<HttpConnector>> {
+        let mut http_connector = HttpConnector::new();
+        http_connector.set_connect_timeout(request_timeout);
+        let https_connector = HttpsConnector::new();
+        Client::builder().build::<_, hyper::Body>(https_connector)
+    }
 }

 #[cfg(test)]
diff --git a/s3/src/request/tokio_backend.rs b/s3/src/request/tokio_backend.rs
index 75cb211..f13f2dd 100644
--- a/s3/src/request/tokio_backend.rs
+++ b/s3/src/request/tokio_backend.rs
@@ -3,9 +3,7 @@ extern crate md5;

 use bytes::Bytes;
 use futures::TryStreamExt;
-use hyper::client::HttpConnector;
-use hyper::{Body, Client};
-use hyper_tls::HttpsConnector;
+use hyper::Body;
 use maybe_async::maybe_async;
 use std::collections::HashMap;
 use time::OffsetDateTime;
@@ -39,39 +37,6 @@ impl<'a> Request for HyperRequest<'a> {
             Err(e) => return Err(e),
         };

-        #[cfg(any(feature = "use-tokio-native-tls", feature = "tokio-rustls-tls"))]
-        let mut tls_connector_builder = native_tls::TlsConnector::builder();
-
-        #[cfg(not(any(feature = "use-tokio-native-tls", feature = "tokio-rustls-tls")))]
-        let tls_connector_builder = native_tls::TlsConnector::builder();
-
-        if cfg!(feature = "no-verify-ssl") {
-            cfg_if::cfg_if! {
-                if #[cfg(feature = "use-tokio-native-tls")]
-                {
-                    tls_connector_builder.danger_accept_invalid_hostnames(true);
-                }
-
-            }
-
-            cfg_if::cfg_if! {
-                if #[cfg(any(feature = "use-tokio-native-tls", feature = "tokio-rustls-tls"))]
-                {
-                    tls_connector_builder.danger_accept_invalid_certs(true);
-                }
-
-            }
-        }
-        let tls_connector = tokio_native_tls::TlsConnector::from(tls_connector_builder.build()?);
-
-        let mut http_connector = HttpConnector::new();
-        http_connector.set_connect_timeout(self.bucket.request_timeout);
-        // let https_connector = HttpsConnector::from((http_connector, tls_connector));
-
-        let https_connector = HttpsConnector::new();
-
-        let client = Client::builder().build::<_, hyper::Body>(https_connector);
-
         let method = match self.command.http_verb() {
             HttpMethod::Delete => http::Method::DELETE,
             HttpMethod::Get => http::Method::GET,
@@ -91,7 +56,7 @@ impl<'a> Request for HyperRequest<'a> {

             request.body(Body::from(self.request_body()))?
         };
-        let response = client.request(request).await?;
+        let response = self.bucket.client.request(request).await?;

         if cfg!(feature = "fail-on-err") && !response.status().is_success() {
             let status = response.status().as_u16();

This moves creation of the Hyper client into Bucket constructor. Previously, 100 tasks running in parallel would blow up CPU usage on my laptop, now 1000 tasks in parallel work fast and good.

I would also recommend to apply patch from #343 to eliminate random bucket errors.

lyokha commented 1 year ago

I didn't notice that make_client() ignored the value of the request timeout. Below is a fixed version.

    fn make_client(request_timeout: Option<Duration>) -> Client<HttpsConnector<HttpConnector>> {
        let mut http_connector = HttpConnector::new();
        http_connector.enforce_http(false);
        http_connector.set_connect_timeout(request_timeout);
        let https_connector = HttpsConnector::new_with_connector(http_connector);
        Client::builder().build(https_connector)
    }
durch commented 11 months ago

https://github.com/durch/rust-s3/commit/7d7c57b5cc967299ebf338337c7fb3b68e57c03c implements client reuse for with-tokio feature, gonna close this one, feel free to reopen if it does not meet expectations