jpopesculian / twitter-v2-rs

Rust bindings for Twitter API v2
https://docs.rs/twitter-v2/latest
102 stars 28 forks source link

example of using PaginableApiResult? #8

Open db48x opened 2 years ago

db48x commented 2 years ago

Could you include or point to some existing code that uses the PaginableApiResult to fetch a list, such as a user’s list of followers?

My first several attempts to write this worked, but are super ugly. I feel like I am missing an obvious way to write it that would be readable or even beautiful. I used the governor crate to rate–limit the requests, but I haven’t yet done anything to retry transient errors; I feel like it’s ugly enough already.

jpopesculian commented 2 years ago

Hi @db48x, would you mind posting an example of your code and maybe highlight the pain points? Maybe I can give some tips, or maybe there's a way to make the API more friendly

db48x commented 2 years ago

After going back and forth a bit, here is what I currently have:

pub async fn get_all_pages<Auth, T>(rl: &DefaultRateLimiter,
                                initial_response: twitter_v2::Result<Option<ApiResponse<Auth, Vec<T>, ResultCountMeta>>>,
                                expected: Option<usize>) -> Result<Vec<T>>
where Auth: Authorization + Send + Sync + std::fmt::Debug,
      T: serde::de::DeserializeOwned + Clone + Debug + Send + Sync {
  let mut items: Vec<T> = match expected {
    Some(v) => Vec::with_capacity(v),
    _ => Vec::new(),
  };
  let mut response: twitter_v2::Result<Option<ApiResponse<Auth, Vec<T>, ResultCountMeta>>> = initial_response;
  loop {
    match response {
      // BUG(db48x): no way to retry if the initial request got a 429
      Err(e) => return Err(anyhow!(e)),
      Ok(None) => break,
      Ok(Some(ref r)) => {
        if let Some(new_items) = r.data() {
          items.extend_from_slice(&new_items);
        }
        if matches!(r.meta(), Some(meta) if meta.next_token().is_some()) {
          rl.until_ready().await;
          let new_response = r.next_page().await;
          match new_response {
            Err(twitter_v2::Error::Request(ref e)) if e.is_timeout() || e.is_connect() || e.status() == Some(reqwest::StatusCode::TOO_MANY_REQUESTS) => {
              warn!(error=?e, "retrying due to request error");
            },
            Err(twitter_v2::Error::Api(ref e)) if e.status == reqwest::StatusCode::TOO_MANY_REQUESTS => {
              warn!(error=?e, "retrying due to 429 response");
            },
            Err(e) => return Err(anyhow!(e)),
            _ => {
              response = new_response;
            }
          }
        } else {
          break;
        }
      }
    }
  }
  Ok(items)
}

A caller would look like this:

pub async fn get_list_members<Auth>(api: &TwitterApi<Auth>, rl: &DefaultRateLimiter, list_id: u64) -> Result<Vec<User>>
where Auth: Authorization + Send + Sync + std::fmt::Debug {
  rl.until_ready().await;
  let response = api.get_list_members(list_id)
    .user_fields([UserField::Id, UserField::Name])
    .max_results(100)
    .send()
    .await
    .map(|response| Some(response));
  get_all_pages(rl, response, None)
    .await
    .with_context(|| format!("Failed to fetch list members for list {list_id}"))
}

Three things about it still annoy me:

I’m going to spend a few hours today to see if I can solve the second problem by writing a function that takes a closure returning a response and recalls the closure if the response is one that can be retried. I am hoping that will simplify get_all_pages too.

db48x commented 2 years ago

Oh, the fourth thing that annoys me is the .map(|response| Some(response)) in the callers.

db48x commented 2 years ago

Here it is with retrying pulled out as a separate function. A little better perhaps, but still plenty of annoyances. Also, async closures are nightly–only; I had forgotten about that.

type DefaultRateLimiter = governor::RateLimiter<governor::state::NotKeyed, governor::state::InMemoryState, governor::clock::DefaultClock>;
type OptionResponse<Auth, T, Meta> = twitter_v2::Result<Option<twitter_v2::ApiResponse<Auth, Vec<T>, Meta>>>;
type Response<Auth, T, Meta> = twitter_v2::Result<twitter_v2::ApiResponse<Auth, Vec<T>, Meta>>;

pub async fn get_list_members<Auth>(api: &TwitterApi<Auth>, rl: &DefaultRateLimiter, list_id: u64) -> Result<Vec<User>>
where Auth: Authorization + Send + Sync + std::fmt::Debug {
  get_all_pages(rl,
                async || {
                  api.get_list_members(list_id)
                    .user_fields([UserField::Id, UserField::Name])
                    .max_results(100)
                    .send()
                    .await
                },
                None)
    .await
    .with_context(|| format!("Failed to fetch list members for list {list_id}"))
}

pub async fn retry<Auth, T, Meta, F, Fut>(rl: &DefaultRateLimiter, func: F) -> OptionResponse<Auth, T, Meta>
where Auth: Authorization + Send + Sync + std::fmt::Debug,
      T: serde::de::DeserializeOwned + Clone + Debug + Send + Sync,
      F: Fn() -> Fut,
      Fut: std::future::Future<Output = OptionResponse<Auth, T, Meta>> {
  loop {
    rl.until_ready().await;
    match func().await {
      Err(twitter_v2::Error::Request(ref e)) if e.is_timeout() || e.is_connect() => {
        warn!(error=?e, "retrying due to request error");
      },
      Err(twitter_v2::Error::Api(ref e)) if e.status == reqwest::StatusCode::TOO_MANY_REQUESTS => {
        warn!(error=?e, "retrying due to 429 response");
        // TODO(db48x): extend the twitter_v2 crate to expose the
        //   rate–limiting information provided by the twitter api
      },
      response => {
        if let Ok(Some(ref r)) = response {
          tracing::info!(url=%r.url(), "success");
        }
        return response;
      }
    }
  }
}

pub async fn get_all_pages<Auth, T, Meta, F, Fut>(rl: &DefaultRateLimiter,
                                                  func: F,
                                                  expected: Option<usize>) -> Result<Vec<T>>
where Auth: Authorization + Send + Sync + std::fmt::Debug,
      T: serde::de::DeserializeOwned + Clone + Debug + Send + Sync,
      Meta: twitter_v2::meta::PaginationMeta + serde::de::DeserializeOwned + Send + Sync,
      F: Fn() -> Fut,
      Fut: std::future::Future<Output = Response<Auth, T, Meta>> {
  let mut items: Vec<T> = match expected {
    Some(v) => Vec::with_capacity(v),
    _ => Vec::new(),
  };
  let mut response: OptionResponse<Auth, T, Meta> = retry(rl, async || func().await.map(Some)).await;
  loop {
    match response {
      Err(e) => return Err(anyhow!(e)),
      Ok(None) => break,
      Ok(Some(ref r)) => {
        if let Some(new_items) = r.data() {
          items.extend_from_slice(&new_items);
        }
        if matches!(r.meta(), Some(meta) if meta.next_token().is_some()) {
          response = retry(rl, async || r.next_page().await).await;
        } else {
          break;
        }
      }
    }
  }
  Ok(items)
}

Note: edited to remove the fourth annoyance.