josh-ashkinaze / plurals

A package for multi-agent, persona-based, pluralistic AI deliberation.
https://josh-ashkinaze.github.io/plurals/
5 stars 1 forks source link

Improve parallel requests handling #30

Open josh-ashkinaze opened 1 month ago

josh-ashkinaze commented 1 month ago

In the implementation I have, I am sending parallel LiteLLM completion requests through Native Python libraries. But some providers have these super low rate limits.

So I see a few paths forward and I am wondering if you can explore this for next week:

Q1: First, is it the case that LiteLLM's batch completion endpoint handles rate limit errors?

If the answer to Q1 is Yes:

If the answer to Q1 is No: I don't think it's worth re-factoring to that since it would have no benefit. Instead, there are some low-lying options:

Experiment: What if we just set a high num_retries as the default for kwargs for Ensemble? num_retries is LiteLLM using Tenacity to wait on rate limit errors.

Propose/implement on development: If you find it still has problems then just make a proposal for our own waiting scheme using tenacity itself (the library litellm is using to wait on requests)

narenedara commented 1 month ago

Was looking into this and I don't think batch completion handles rate limit errors. I am going to try complete the NO solution but I was wondering how I would be able to test it. Would I just make multiple requests in a row or is there a better way to do it?

On Wed, Jul 10, 2024 at 12:07 AM Joshua Ashkinaze @.***> wrote:

In the implementation I have, I am sending parallel LiteLLM completion requests through Native Python libraries. I really thought LiteLLM sleeps on rate limit errors but I got an error sometimes with non openai providers.

So I see a few paths forward and I am wondering if you can explore this for next week:

Q1: First, is it the case that LiteLLM's batch completion endpoint handles rate limit errors?

If the answer to Q1 is Yes:

  • Task: Try to switch our current implementation to simply use batch completion.

If the answer to Q1 is No: I don't think it's worth re-factoring to that since it would have no benefit. Instead, there are some low-lying options:

1.

What if we just set a high num_retries as the default for kwargs for Ensemble? num_retries is LiteLLM using Tenacity to wait on rate limit errors. 2.

If you find it still has problems then just use make a proposal for our waiting scheme using tenacity itself (the library litellm is using to wait on requests)

— Reply to this email directly, view it on GitHub https://github.com/josh-ashkinaze/plurals/issues/30, or unsubscribe https://github.com/notifications/unsubscribe-auth/A2ZEBKNTJVICBJHLMOSEQH3ZLSXMTAVCNFSM6AAAAABKUBW5KGVHI2DSMVQWIX3LMV43ASLTON2WKOZSGM4TSNRZHAYDMMQ . You are receiving this because you were assigned.Message ID: @.***>

josh-ashkinaze commented 1 month ago

Regarding Q1: Are you looking at this code? I think you may be right. It seems like for default completions max_retries is equal to zero and I don't see batch_completion changing that value (though it's a lot of code so idk)

https://github.com/BerriAI/litellm/blob/e201b06d9c2ba4580ca9b1a51f35e12b08588ec6/litellm/main.py#L2747

Regarding replicating an error: No, use the Ensemble method because that will launch them all at once. I weirdly was not able to replicate this error with OpenAI even doing 200 agents...so I don't know if this is because (1) my account has some kind of high rate limit level; (2) I experienced a blip of some sort with Anthropic. Try with Anthropic though. Make the prompt "say hello" and be sure to do kwargs = {'max_tokens':2}

Anyway even if this is an edge case it's good to handle.

Regarding solution if answer to Q1 is no, as you suspect Can you look into this code from LiteLLM? You may have to mess around with the RetryPolicy to get a good amount. https://litellm.vercel.app/docs/routing

Also, it seems they have an open bug about this so I think there may be something interesting. https://github.com/BerriAI/litellm/issues/527

Updated to P0 and let's discuss tomorrow what path forward is I am going to upgrade this to a p0 thing to do since I think this is pretty important actually and the path forward seems clear. Reading LiteLLM's thread, we may as well just handle ourselves.

So there are two options:

  1. We can refactor agent.process so it can take in a kwargs argument. Up until now, we assume an Agent's kwags wouldn't depend on the Structure (which is a logical thing IMO) so I do not want to do this.

  2. Less disruptive, we just change ensemble.process so that instead of calling agent.process in parallel we create a wrapper function process_with_retry that just calls agent.process but uses Tenacity to wait on the RateLimit. We can even look at LitelLM's defaults....when you set num_retries=4 in LiteLLM I know it uses Tenacity but what are the exact parameters? Let's just stick with LiteLLM's defaults.

Okay so I made a draft implementation (be sure to test this though!!) and I think it would look something like this. See how we catch error first and tell user to step up num_retries and then raise it.

import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from LiteLLM.exceptions import RateLimitError

class Ensemble(AbstractStructure):
    def process(self):
        """
        Process the task through an ensemble of agents, each handling the task independently with retries.
        """
        original_task = self.agents[0].original_task_description

        for _ in range(self.cycles):
            with ThreadPoolExecutor() as executor:
                futures = []
                for agent in self.agents:
                    previous_responses_str = ""
                    agent.combination_instructions = self.combination_instructions
                    futures.append(executor.submit(self._process_with_retry, agent, previous_responses_str))
                for future in as_completed(futures):
                    response = future.result()
                    self.responses.append(response)

        if self.moderated and self.moderator:
            moderated_response = self.moderator._moderate_responses(
                self.responses, original_task)
            self.responses.append(moderated_response)
        self.final_response = self.responses[-1]

    @staticmethod
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), 
           retry=retry_if_exception_type(RateLimitError))
    def _process_with_retry(agent, previous_responses_str):
        """
        Process an agent's task with retries in case of rate limit errors.

        Args:
            agent (Agent): The agent to process the task.
            previous_responses_str (str): Previous responses to incorporate into the task.

        Returns:
            str: The response from the agent.
        """
        try:
            return agent.process(previous_responses=previous_responses_str)
        except RateLimitError as e:
            print("Try increasing num_retries in kwargs of model")
            raise e
josh-ashkinaze commented 1 month ago

Unit tests:

To do this you want to use mocking to simulate the behavior of an error....I wrote some other unit tests that do this kind of thing (use mocking to check for errors being handled correctly)

josh-ashkinaze commented 1 month ago

Okay based on our meeting: