seanmonstar / reqwest

An easy and powerful Rust HTTP Client
https://docs.rs/reqwest
Apache License 2.0
9.72k stars 1.09k forks source link

Async Rate Limiting #491

Open theduke opened 5 years ago

theduke commented 5 years ago

A lot of APIs have rate limits, and manually implementing a rate limiter over and over again is quite annoying to me.

There was some previous discussion in #169 but that was focused on a sync context.

@seanmonstar would you be open to a built in rate limiter? I imagine it working like this:

A simple implementation without prioritization could cause cascading delays though. A fix for that would be a request queue.

The complexity for this would not be too high and I would really love that functionality.

An alternative solution to this would be to have a async middleware feature with a pre-request hook that can return a future. That way a third party crate could supply the functionality easily, but async middleware is probably a larger topic. (remotely related discussion regarding sync hooks here: #155)

seanmonstar commented 5 years ago

We've been working on a general middleware stack, and have a form of rate-limiting here: https://github.com/tower-rs/tower/blob/master/tower-limit/src/rate/service.rs

It'd probably be useful to make adjustments there.

edrevo commented 4 years ago

@seanmonstar I've seen examples on how to integrate the hyper client and tower. Is it also possible to do that with reqwest and tower? Is there any example code or documentation that I can use as a hint to build that integration?

Thanks for all of your work with warp, hyper, reqwest & co. It's really great to use your libs!

seanmonstar commented 4 years ago

@edrevo the easiest way is to just use tower::service_fn:

let client = reqwest::Client::new(); // or use builder

let svc = tower::service_fn(move |req| {
    client.execute(req)
});
ardeaf commented 4 years ago

@seanmonstar Would you be able to provide a quick example on how to convert one of the reqwest examples into a rate-limited one?

ie, from the docs:

let client = reqwest::Client::new();
let res = client.post("http://httpbin.org/post")
    .body("the exact body that is sent")
    .send()
    .await?;

How would we refactor that into a rate limited request using tower-limit?

Mathspy commented 4 years ago

A bit late but here's an example of the above using tower if someone looks at this issue in the future

use reqwest::{Body, Method, Request, Url};
use std::time::Duration;
use tower::Service;
use tower::ServiceExt;

let client = reqwest::Client::new();
let mut svc = tower::ServiceBuilder::new()
    .rate_limit(100, Duration::new(10, 0)) // 100 requests every 10 seconds
    .service(tower::service_fn(move |req| client.execute(req)));

let mut req = Request::new(Method::POST, Url::parse("http://httpbin.org/post")?);
*req.body_mut() = Some(Body::from("the exact body that is sent"));

let res = svc.ready_and().await?.call(req).await?;

Definitely a bit more verbose but it works like a charm and the client and service needs to be setup only once anyway

flexabyte commented 4 years ago

I've not had any success getting this working by storing the service in a struct... I can't find a proper way to declare the type of the Service. If I do:

struct ServiceStruct {
    service: dyn tower::Service<Response=HttpResponse,Error=HttpError,Future=Pin<Box<Result<HttpResponse,HttpError>>>>,
... more fields ...
}

let mut svc = tower::ServiceBuilder::new()
    .rate_limit(1, Duration::new(0, 1600)) // 1 request every 1600ms
    .service(tower::service_fn(move |req| client.execute(req)));

ServiceStruct { service: svc, ... more fields }    

But the type definition for tower::Service in the struct is wrong, and I can't find any usable documentation for declaring the type... has anyone had success with this?

UPDATE:

I had the wrong end of the stick with this one, but I got there in the end. For anyone looking to do something similar, here is the adapted code for storing a rate limit service in a struct:

use reqwest::{Body, Method, Request, Url};
use std::time::Duration;
use tower::Service;
use tower::ServiceExt;
use tower_limit::rate::RateLimit;
use tower_util::ServiceFn;

struct ServiceStruct<T> {
    service: RateLimit<ServiceFn<T>>,
}

#[tokio::main]
async fn main() {
    let client = reqwest::Client::new();
    let service = tower::ServiceBuilder::new()
        .rate_limit(100, Duration::new(10, 0)) // 100 requests every 10 seconds
        .service(tower::service_fn(move |req| client.execute(req)));

    let mut service_struct = ServiceStruct{service};

    let mut req = Request::new(Method::POST, Url::parse("http://httpbin.org/post").unwrap());
    *req.body_mut() = Some(Body::from("the exact body that is sent"));

    let res = service_struct.service.ready_and().await.unwrap().call(req).await.unwrap();
    println!("res: {:?}", res);
}
svc-93 commented 3 years ago

We've been working on a general middleware stack, and have a form of rate-limiting here: https://github.com/tower-rs/tower/blob/master/tower-limit/src/rate/service.rs

It'd probably be useful to make adjustments there.

This link seems to be non-functional now. Has there been any progress on this feature request? It'd be useful to have this available as an out-of-box addition!

liamdawson commented 3 years ago

@svc-93 I think that would be https://github.com/tower-rs/tower/blob/master/tower/src/limit/rate/service.rs now

eute commented 3 years ago

A bit late but here's an example of the above using tower if someone looks at this issue in the future

use reqwest::{Body, Method, Request, Url};
use std::time::Duration;
use tower::Service;
use tower::ServiceExt;

let client = reqwest::Client::new();
let mut svc = tower::ServiceBuilder::new()
    .rate_limit(100, Duration::new(10, 0)) // 100 requests every 10 seconds
    .service(tower::service_fn(move |req| client.execute(req)));

let mut req = Request::new(Method::POST, Url::parse("http://httpbin.org/post")?);
*req.body_mut() = Some(Body::from("the exact body that is sent"));

let res = svc.ready_and().await?.call(req).await?;

Definitely a bit more verbose but it works like a charm and the client and service needs to be setup only once anyway

How to use this in a multithreaded context where each thread uses the same tower service and needs to be constrained by the same rate limit?

ilyazub commented 2 years ago

How to use this in a multithreaded context where each thread uses the same tower service and needs to be constrained by the same rate limit?

@eute Probably by using the rt-multi-thread feature of tokio.

# Cargo.toml
tokio = { version = "1.7.1", features = ["rt-multi-thread", "macros"] }
// src.main.rs

#[tokio::main]
async fn main() {}

The default "RuntimeFlavor" of tokio::main macros is multi_threaded. Source.

sergeyshaykhullin commented 2 years ago

@ilyazub I am getting the same problem as @eute. Service has to be mutable, so i can't share exclusive reference and operate concurrently

deknowny commented 2 years ago

If you are still finding a solution for client-side rate limiting, I made a little crate raliguard with Semaphore implementing fixed window algorithm to control execution times per a period. Here is an example of asynchronous usage where the semaphore is shared between threads. It also supports any async/await backends

use std::{thread, sync, time};

use raliguard::Semaphore;

// Create a semaphore with restriction `5 tasks per 1 second`
let original_sem = Semaphore::new(5, time::Duration::from_secs(1));

// Make it sharable between threads (or you can share between tasks)
let shared_sem = sync::Arc::new(
    sync::Mutex::new(original_sem)
);

// Spawn 15 threads
for _ in 0..15 {
    let cloned_sem = shared_sem.clone();
    let thread = thread::spawn(move || {
        // Lock mutex for exclusive usage
        let mut local_sem = cloned_sem.lock().unwrap();

        // Get required delay
        let calculated_delay = local_sem.calc_delay();

        // Release mutex, make semaphore available to use in another threads
        drop(local_sem);

        // If delay exists, sleep it
        if let Some(delay) = calculated_delay {
            thread::sleep(delay);
        }

        // Here you can do your requests or another stuff
    });
}

// Sleep 1 second. Only 10 threads will be completed at this time
// (first 5 with no delay and another 5 after a second)
thread::sleep(time::Duration::from_secs(1));
Mathspy commented 2 years ago

This is much much easier to do now after reqwest v0.11.11

Client now implements service so you can do this to return a service powered by reqwest from a function and store it into a struct

use std::time::Duration;

use reqwest::{Error, Request, Response};
use tower::Service;

fn example() -> impl Service<Request, Response = Response, Error = Error> {
    let client = reqwest::Client::new();

    tower::ServiceBuilder::new()
        .rate_limit(10, Duration::from_secs(5))
        .service(client)
}

struct Example<S>
where
    S: Service<Request, Response = Response, Error = Error>,
{
    service: S,
}

fn main() {
    let example = Example { service: example() };
}

If you need your service to be usable from multiple different contexts the easiest thing is to clone your service and pass the clones into those contexts. However, RateLimit prevents services from being cloned because then each service would have its own rate limits and that defeats the purpose of rate limits, so we can use a Buffer service (which happens to have a different error type) like so:

diff --git a/src/main.rs b/src/main.rs
index d39d616..8547d43 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,19 +1,20 @@
 use std::time::Duration;

 use reqwest::{Error, Request, Response};
-use tower::Service;
+use tower::{BoxError, Service};

-fn example() -> impl Service<Request, Response = Response, Error = Error> {
+fn example() -> impl Service<Request, Response = Response, Error = BoxError> {
     let client = reqwest::Client::new();

     tower::ServiceBuilder::new()
+        .buffer(100)
         .rate_limit(10, Duration::from_secs(5))
         .service(client)
 }

 struct Example<S>
 where
-    S: Service<Request, Response = Response, Error = Error>,
+    S: Service<Request, Response = Response, Error = BoxError>,
 {
     service: S,
 }
nirvana-msu commented 1 year ago

Do I understand correctly, that RequestBuilder pattern is not compatible with using tower service? RequestBuilder is initialized with Client - which is fine when using reqwest on its own, but not when we need to use tower that wraps the client..

Setting body may be as easy as assigning to *req.body_mut(), however properly updating headers (as well as other things builder helps with) can be a lot more involved.

@seanmonstar is there a chance RequestBuilder could be decoupled from Client somehow? The only place it actually uses the client field is within send() method... building the request itself doesn't need the client at all. Obviously main concern here would be API backwards compatibility - otherwise what I want could be achieved e.g. by just wrapping client field with Option.

As a workaround I can actually use some dummy client when instantiating RequestBuilder, and then instead of using send() I can get request with build(), and then send using tower service. It's just slightly annoying that I need to pass in the dummy client which will never be used.

nirvana-msu commented 1 year ago

RateLimit prevents services from being cloned because then each service would have its own rate limits and that defeats the purpose of rate limits, so we can use a Buffer service like so

Doesn't this introduce a race condition?

RateLimit does not reserve any capacity upon poll_ready, unlike e.g. ConcurrencyLimit. The only thing that prevents race condition when using RateLimit by itself is that poll_ready/call methods take &mut self, so you can't invoke those concurrently from different tasks without cloning (and this presumably is the main reason Clone wasn't implemented for them, unlike was done for ConcurrencyLimit).

Putting Buffer service in front with bound larger than 1 sidesteps that - it is now possible for multiple (up to bound) tasks to obtain permission from poll_ready, all without actually reserving any rate limit capacity - so when they proceed to call, there may not be enough capacity for them all (only capacity for one is guaranteed).

xylonx commented 1 year ago

In tower::ServiceBuilder, I have found an example declaring as "containing rate limiting, in-flight request limits, and a channel-backed, clonable Service". below is the the example codes, or you can find it here

ServiceBuilder::new()
    .buffer(5)
    .concurrency_limit(5)
    .rate_limit(5, Duration::from_secs(1))
    .service(svc);

Based on that, I have written my channel-based wrapper and it indeed works. I post it below and hope it can help someone. If it introduces some race conditions, please be easy to point it :)

use std::time::Duration;

use anyhow::{anyhow, Result};
use futures::{
    channel::{mpsc, oneshot},
    SinkExt, StreamExt,
};
use log::error;
use reqwest::{Client, Request, Response};
use tower::{Service, ServiceExt};

#[derive(Debug)]
pub struct LimitedRequestClient {
    request_tx: mpsc::Sender<(Request, oneshot::Sender<Result<Response>>)>,
}

impl LimitedRequestClient {
    /// [buffer] -> [concurrency req pool] - :{rate limit}: -> client.call()
    pub fn new(
        client: Client,
        channel_buffer_size: usize,
        request_buffer_size: usize,
        max_concurrency_number: usize,
        rate_limit_number: u64,
        rate_limit_duration: Duration,
    ) -> Self {
        let (tx, rx) =
            mpsc::channel::<(Request, oneshot::Sender<Result<Response>>)>(channel_buffer_size);

        tokio::spawn(async move {
            let service = tower::ServiceBuilder::new()
                .buffer(request_buffer_size)
                .concurrency_limit(max_concurrency_number)
                .rate_limit(rate_limit_number, rate_limit_duration)
                .service(client.clone());
            rx.for_each_concurrent(max_concurrency_number, move |(req, resp_tx)| {
                let mut inner_service = service.clone();
                async move {
                    let resp = match inner_service.ready().await {
                        Ok(srv) => match srv.call(req).await {
                            Ok(r) => Ok(r),
                            Err(e) => Err(anyhow!(
                                "LimitedRequestClient: service call request failed: {}",
                                e
                            )),
                        },
                        Err(e) => Err(anyhow!("LimitedRequestClient: service ready failed: {}", e)),
                    };
                    match resp_tx.send(resp) {
                        Ok(_) => (),
                        Err(_) => error!(
                            "LimitedRequestClient: send resp to resp_tx failed: channel closed"
                        ),
                    }
                }
            })
            .await // keep it in-flight
        });
        Self { request_tx: tx }
    }

    pub async fn request(&self, req: Request) -> Result<Response> {
        let (tx, rx) = oneshot::channel::<Result<Response>>();
        self.request_tx.clone().send((req, tx)).await?;
        rx.await?
    }
}