geekan / MetaGPT

🌟 The Multi-Agent Framework: First AI Software Company, Towards Natural Language Programming
https://deepwisdom.ai/
MIT License
45.46k stars 5.41k forks source link

[Bug]: Rate Limit Exceeded in DDGAPIWrapper Search Function #1567

Open cnm13ryan opened 3 weeks ago

cnm13ryan commented 3 weeks ago

Describe the Bug

The original DDGAPIWrapper raises a DuckDuckGoSearchException due to exceeding DuckDuckGo API's rate limits. This prevents the application from retrieving search results effectively.

Error Message

duckduckgo_search.exceptions.DuckDuckGoSearchException: _get_url() https://duckduckgo.com/ DuckDuckGoSearchException: Ratelimit

Steps to Reproduce

Sample Code of the original DDGAPIWrapper:

import asyncio, json
from concurrent import futures
from typing import Literal, Optional, overload
from pydantic import BaseModel, ConfigDict
from duckduckgo_search import DDGS

class DDGAPIWrapper(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)
    loop: Optional[asyncio.AbstractEventLoop] = None
    executor: Optional[futures.Executor] = None
    proxy: Optional[str] = None

    @property
    def ddgs(self):
        return DDGS(proxies=self.proxy)

    @overload
    def run(self, query: str, max_results: int = 8, as_string: Literal[True] = True, focus: list[str] | None = None) -> str:
        ...

    @overload
    def run(self, query: str, max_results: int = 8, as_string: Literal[False] = False, focus: list[str] | None = None) -> list[dict[str, str]]:
        ...

    async def run(self, query: str, max_results: int = 8, as_string: bool = True) -> str | list[dict]:
        loop = self.loop or asyncio.get_event_loop()
        future = loop.run_in_executor(self.executor, self._search_from_ddgs, query, max_results)
        search_results = await future

        if as_string:
            return json.dumps(search_results, ensure_ascii=False)
        return search_results

    def _search_from_ddgs(self, query: str, max_results: int):
        return [
            {"link": i["href"], "snippet": i["body"], "title": i["title"]}
            for (_, i) in zip(range(max_results), self.ddgs.text(query))
        ]

if __name__ == "__main__":
    import fire
    fire.Fire(DDGAPIWrapper().run)

Expected Behavior

The DDGAPIWrapper.run method should perform DuckDuckGo searches and return the specified number of results without triggering rate limit errors, even under multiple concurrent requests.

Actual Behavior

The method intermittently raises a DuckDuckGoSearchException with the message Ratelimit, indicating that the application is sending too many requests to the DuckDuckGo API in a short period.

Analysis

Cause: The original script lacks mechanisms to control the frequency of API requests. Rapid or concurrent calls to the run method exceed DuckDuckGo's rate limits, resulting in exceptions.

Impact: Failed search operations disrupt the application's functionality, leading to an inability to retrieve necessary search results.

Proposed Solution

Enhance the DDGAPIWrapper by implementing rate limiting, retry mechanisms, and caching to prevent exceeding DuckDuckGo's API rate limits.

Modifications in DDGAPIWrapper (Script 2):

Modified DDGAPIWrapper:

import asyncio, json
from concurrent import futures
from typing import Literal, Optional, overload
from pydantic import BaseModel, ConfigDict
from aiolimiter import AsyncLimiter
from aiocache import cached, Cache
from aiocache.serializers import JsonSerializer
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
from duckduckgo_search import DDGS

# Custom exception for DuckDuckGo search errors
class DuckDuckGoSearchException(Exception):
    pass

class DDGAPIWrapper(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)
    loop: Optional[asyncio.AbstractEventLoop] = None
    executor: Optional[futures.Executor] = None
    proxy: Optional[str] = None
    rate_limiter: AsyncLimiter = AsyncLimiter(max_rate=1, time_period=1)

    @property
    def ddgs(self):
        return DDGS(proxies=self.proxy)

    @overload
    def run(self, query: str, max_results: int = 8, as_string: Literal[True] = True, focus: list[str] | None = None) -> str:
        ...

    @overload
    def run(self, query: str, max_results: int = 8, as_string: Literal[False] = False, focus: list[str] | None = None) -> list[dict[str, str]]:
        ...

    @retry(
        retry=retry_if_exception_type(DuckDuckGoSearchException),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        stop=stop_after_attempt(5),
        reraise=True
    )
    async def run(self, query: str, max_results: int = 8, as_string: bool = True) -> str | list[dict]:
        loop = self.loop or asyncio.get_event_loop()

        async with self.rate_limiter:
            try:
                future = loop.run_in_executor(self.executor, self._search_from_ddgs, query, max_results)
                search_results = await future
            except Exception as e:
                if 'Ratelimit' in str(e):
                    print(f"Rate limit hit for query: {query}. Retrying...")
                    raise DuckDuckGoSearchException("Rate limit exceeded") from e
                else:
                    print(f"An error occurred: {e}")
                    raise DuckDuckGoSearchException("An error occurred during search") from e

        if as_string:
            return json.dumps(search_results, ensure_ascii=False)
        return search_results

    def _search_from_ddgs(self, query: str, max_results: int):
        return [
            {"link": i["href"], "snippet": i["body"], "title": i["title"]}
            for (_, i) in zip(range(max_results), self.ddgs.text(query))
        ]

    @cached(ttl=300, cache=Cache.MEMORY, serializer=JsonSerializer())
    async def cached_run(self, query: str, max_results: int = 8, as_string: bool = True) -> str | list[dict]:
        """Cached version of the run method."""
        return await self.run(query, max_results, as_string)

if __name__ == "__main__":
    import fire
    api_wrapper = DDGAPIWrapper()
    fire.Fire(api_wrapper.cached_run)

Outcome

The application seems to have successfully mitigated rate limit issues.

I am saying this because I am using models inferred by ollama API endpoints, such that I have encountered issues with respect to token counting, which ollama community is still tackling.

So, it would be grateful for anyone who is able to test this proposed solution and see whether it does solve problems relating to DDG while not introducing other errors.

But overall, some comments on the proposed fix:

shenchucheng commented 2 weeks ago

@cnm13ryan Thank you very much for your advice. Using tenacity is indeed an effective approach. However, I suggest defining a separate DDGSearchRatelimitException, and only retry when a DDGSearchRatelimitException is triggered, rather than retrying on any exception.

shenchucheng commented 2 weeks ago

@cnm13ryan And, feel free to raise a PR :)