mattsse / voyager

crawl and scrape web pages in rust
Apache License 2.0
740 stars 35 forks source link

Handling of rate-limiting websites #6

Open oblique opened 3 years ago

oblique commented 3 years ago

I have a case where a webpage is using Cloudflare and after a number of requests I get 429 error. The response contains Retry-After header with a value of how many seconds we should send the next request.

Currently, is there a way to handle this?

mattsse commented 3 years ago

429 is a rate limit error, meaning too many requests. You can implement a timeout between every request

let config = CrawlerConfig::default().allow_domain_with_delay(
      "news.ycombinator.com",
        // add a delay between requests
        RequestDelay::Fixed(std::time::Duration::from_millis(3_000)),
 );
oblique commented 3 years ago

Unfortunately it is not that simple. From trial and error approach I understood that some websites have multiple levels of rate-limiting. For example in my case I manage to make ~60 requests with 2 seconds delay before I got 429 and after I increase the delay to 5 seconds I manage to make ~400 requests but I got 429 again.

What if crawler has pause/resume functionality, so the user can implement their own rate-limiting. Is this acceptable?

mattsse commented 3 years ago

I see, the Response contains the statuscode and response headers that you can check for rate limiting. One way to handle custom timeouts when you receive a 429 is with the client.crawl function (see inject async calls) that takes an async closure, where you can enforce a custom delay, for example with futures::Delay (take a look at the current RequestDelay impl for reference), this way you could add an additional Retry state. however, this might not be very ergonomic. I'll gladly accept PRs that would improve this, maybe an additional schedule function for example.

oblique commented 3 years ago

I ended up using client.crawl as you mentioned.

This is my crawl async function:

async fn crawl_url(
    client: Arc<Client>,
    url: &str,
    state: State,
) -> Result<(reqwest::Response, Option<State>)> {
    static LIMITER: Lazy<Limiter> = Lazy::new(|| Limiter::new());

    loop {
        LIMITER.until_ready().await;

        let resp = client.get(url).send().await?;

        if resp.status().as_u16() == 429 {
            if let Some(retry_after) = resp
                .headers()
                .get("retry-after")
                .and_then(|v| v.to_str().ok())
                .and_then(|s| s.parse().ok())
            {
                let dur = Duration::from_secs(retry_after);
                LIMITER.update_sleep_for(dur).await;
            }
            continue;
        }

        return Ok((resp, Some(state)));
    }
}

This is how I inject it:

    crawler.crawl(|client| {
        crawl_url(
            client,
            &format!("https://example.com/?p={}", page + 1),
            State::Page(page + 1),
        )
    });

And this is my rate-limiter that handles 429:

use governor::clock::{Clock, MonotonicClock};
use governor::state::direct::NotKeyed;
use governor::state::InMemoryState;
use governor::{Quota, RateLimiter};
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::{sleep, sleep_until, Instant};

pub struct Limiter {
    rate_limiter: RateLimiter<NotKeyed, InMemoryState, MonotonicClock>,
    sleep_until: RwLock<Instant>,
    clock: MonotonicClock,
}

impl Limiter {
    pub fn new() -> Self {
        let clock = MonotonicClock::default();
        let quota = Quota::with_period(Duration::from_secs(1)).unwrap();
        let rate_limiter = RateLimiter::direct_with_clock(quota, &clock);

        Limiter {
            rate_limiter,
            sleep_until: RwLock::new(Instant::now()),
            clock,
        }
    }

    pub async fn update_sleep_until(&self, tm: Instant) {
        let mut sleep_until = self.sleep_until.write().await;

        if *sleep_until < tm {
            *sleep_until = tm;
        }
    }

    pub async fn update_sleep_for(&self, dur: Duration) {
        let tm = Instant::now() + dur;
        self.update_sleep_until(tm).await;
    }

    pub async fn until_ready(&self) {
        loop {
            let tm = self.sleep_until.read().await.clone();

            if Instant::now() < tm {
                sleep_until(tm).await;
            }

            match self.rate_limiter.check() {
                Ok(()) => break,
                Err(tm) => {
                    let dur = tm.wait_time_from(self.clock.now());
                    sleep(dur).await;
                }
            }
        }
    }
}