0xtlt / shopify_api

A shopify api client for rust
MIT License
12 stars 3 forks source link

Support for rate limiting #23

Open emilbonnek opened 2 weeks ago

emilbonnek commented 2 weeks ago

Thank you for making this!

I wonder if you have an example of how to stay within the Shopify rate limits given here: https://shopify.dev/docs/api/usage/rate-limits

I know there are crates that allows setting up client rate limits like this (Governor, reqwest-ratelimit), but it isn't entirely clear to me how this would be done in this case. But I wonder if there was maybe room for a feature in the crate itsealf that would enable something to take care of this. Like a feature called governor which would bring in automatic rate limitation. I am not a super advanced rust user so I don't know how concerns are normally split in this ecosystem and if this is something end users are expected to take care of themselves. But having an example of how to do it would be nice as I imagine it is something many users could be interested in.

DoIevenLift commented 2 days ago

Just to throw my 2 cents in (I am not a galaxy brain Rust user) - I think it should really be up to the end user on how they want to implement rate limiting rather than have the 'magic' inside the crate itself.

For staying within the limits - in my experience running a 10k+ SKU (hardly anything compared to some) Shopify site - revert everything you can to bulk actions. This will make the rate limits redundant where you can default to these anyway. If you haven't touched Shopify in a while they recently upgraded their product/item data model which gives some better features (more bulk action availability) - https://shopify.dev/docs/apps/build/graphql/migrate/new-product-model.

If you can't use bulk queries (there is still heaps of areas lacking) I am 90% certain the graphQL endpoint returns an X-remaining-limit header which you can read and fallback appropriately - you'll just need to pass an empty response_finder. This is how you could implement something which would be dirty and nasty.

let set_query_response_finder = vec![]; // blank is fine. This is required by the lib.

  let set_inventory_mutation: Result<GraphQLResponse, ShopifyAPIError> = shopify_client
    .graphql_query(query, &set_mutation, &set_query_response_finder)
    .await;

  match set_inventory_mutation {
    Ok(response) => {
      let throttle_status = response.extensions.cost.throttleStatus;

      if throttle_status.currentlyAvailable < 100.00 {
        // delay
        tokio::time::sleep(std::time::Duration::from_secs(20)).await;
      }

      Ok(())
    }
    Err(e) => {
      // logging
      tracing::error!("Error setting inventory: {}", e);

      Ok(())
    }
  }

Hope this helps!

emilbonnek commented 54 minutes ago

Thats a good consideration. But I think you can only have one bulk query executing at a time though right? I have several different endpoints that would need to do this, and I am sure there would be overlap in when they are called.

I ended up going a bit of a different route and not using this library in the end, but I solved my requirement of handling rate limitting by using the leaky-bucket crate. And then I check the cost of every query and when I call my client it calculates if the request might get rate limited. And as I go along I keep updating the current rate limiting status based of what Shopify admin returns.

I am not sure if the same could be done with the library, but here is a snippet of what I did in the end:


#[derive(Debug, Clone)]
pub struct Client {
    url: Url,
    client: reqwest_middleware::ClientWithMiddleware,
    rate_limiter: Arc<Mutex<RateLimiter>>,
}

/** We never want to get too close to the Shopify API rate limit, so we add a bit to our estimated cost */
const BUFFER_COST: usize = 40;

impl Client {
    /**
    Run a query against the Shopify GraphQL API.

    This function will automatically handle rate limiting.
    */
    pub async fn send<T, U>(
        &self,
        body: QueryBody<T>,
        estimated_query_cost: usize,
    ) -> Result<U, Error>
    where
        T: serde::Serialize + Debug,
        U: serde::de::DeserializeOwned,
    {
        let safe_cost = estimated_query_cost + BUFFER_COST;
        {
            let rate_limiter = self.rate_limiter.lock().await;
            let could_acquire = rate_limiter.try_acquire(safe_cost);
            if !could_acquire {
                tracing::trace!(
                    "Expected to be rate limited with query estimated cost {}, balance is {}",
                    safe_cost,
                    rate_limiter.balance()
                );
                rate_limiter.acquire(safe_cost).await;
            }
        }

        tracing::trace!("Making request to Shopify GraphQL API");
        let res = self
            .client
            .post(self.url.clone())
            .json(&body)
            .send()
            .await?;
        tracing::trace!("Parsing response from Shopify GraphQL API");
        let response_body: graphql_client::Response<U> = res.json().await?;

        let extensions = Extensions::try_from(response_body.extensions.clone());
        match extensions {
            Ok(extensions) => {
                let throttle_status = extensions.cost.throttle_status;
                let mut rate_limiter = self.rate_limiter.lock().await;
                *rate_limiter = build_rate_limiter(
                    throttle_status.maximum_available,
                    throttle_status.currently_available,
                    throttle_status.restore_rate,
                );
            }
            Err(err) => {
                tracing::warn!("Failed to parse extensions: {}", err);
            }
        }

        if let Some(errors) = response_body.errors {
            let errors = errors
                .iter()
                .map(|error| error.message.clone())
                .collect::<Vec<String>>()
                .join(", ");
            return Err(Error::Response(errors));
        }

        response_body.data.ok_or(Error::MissingData)
    }
}

pub fn build_client(
    shop_name: String,
    shopify_api_token: String,
) -> Result<Client, BuildClientError> {
    tracing::info!("Building reqwest client");
    let mut default_headers = reqwest::header::HeaderMap::new();
    default_headers.append(
        "X-Shopify-Access-Token",
        reqwest::header::HeaderValue::from_str(shopify_api_token.as_str())?,
    );

    let client = reqwest::Client::builder()
        .default_headers(default_headers)
        .build()?;

    let client = ClientBuilder::new(client)
        .with(TracingMiddleware::default())
        .build();

    const ASSUMED_MAX: usize = 2000;
    const ASSUMED_INITIAL: usize = ASSUMED_MAX;
    const ASSUMED_REFILL: usize = 100;
    let rate_limiter = build_rate_limiter(ASSUMED_MAX, ASSUMED_INITIAL, ASSUMED_REFILL);

    const SHOPIFY_API_VERSION: &str = "2024-07";
    let url = Url::parse(&format!(
        "https://{}.myshopify.com/admin/api/{}/graphql.json",
        shop_name, SHOPIFY_API_VERSION
    ))?;

    Ok(Client {
        url,
        client,
        rate_limiter: Arc::new(Mutex::new(rate_limiter)),
    })
}

/** The rate at which the rate limiter refills is always per second. */
pub static RATE: Lazy<Duration> = Lazy::new(|| Duration::from_secs(1));

/** Build a leaky bucket rate limiter with the given parameters. */
pub fn build_rate_limiter(max: usize, intial: usize, refill: usize) -> leaky_bucket::RateLimiter {
    leaky_bucket::RateLimiter::builder()
        .max(max)
        .initial(intial)
        .refill(refill)
        .interval(*RATE)
        .build()
}

I had a usecase of just running a bunch of mutations in paralell and with this they seem to wait for the balance to build up and avoid getting rate limited by the api server. Again, I am new to Rust so its entirely possible that some part of this way I have done it is a terrible idea, but for my immediate testing it seemed to work nicely.