explodinggradients / ragas

Evaluation framework for your Retrieval Augmented Generation (RAG) pipelines
https://docs.ragas.io
Apache License 2.0
6.48k stars 629 forks source link

getting "asyncio.exceptions.CancelledError" error whenever I run a simple script using ragas #1102

Open edgekid opened 1 month ago

edgekid commented 1 month ago

[ ] I have checked the documentation and related resources and couldn't resolve my bug.

Describe the bug Getting "asyncio.exceptions.CancelledError" as an error

Ragas version: Python version: 3.10

Code to Reproduce

from datasets import Dataset
from ragas.metrics import answer_relevancy, faithfulness
from ragas import evaluate

data_samples = {
    'question': ["How are you?"],
    'answer': ["It's raining."],
    'contexts' : [["I'm feeling sad today. It's raining and I can't go outside."]],
}
dataset = Dataset.from_dict(data_samples)
score = evaluate(dataset, metrics=[faithfulness], raise_exceptions=False)
score.to_pandas()

Error trace

Evaluating:   0%|                                                                                                                                                                                                      | 0/1 [00:00<?, ?it/s]Runner in Executor raised an exception
Traceback (most recent call last):
  File "/*****/python3.10/site-packages/openai/_base_client.py", line 1599, in _request
    response.raise_for_status()
  File "/*****/python3.10/site-packages/httpx/_models.py", line 761, in raise_for_status
    raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '429 Too Many Requests' for url 'https://api.openai.com/v1/chat/completions'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/*****/python3.10/site-packages/openai/_base_client.py", line 1605, in _request
    return await self._retry_request(
  File "/*****/python3.10/site-packages/openai/_base_client.py", line 1605, in _request
    return await self._retry_request(
  File "/*****/python3.10/asyncio/tasks.py", line 605, in sleep
    return await future
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/*****/python3.10/site-packages/ragas/metrics/_faithfulness.py", line 248, in _ascore
    statements = await self.llm.generate(
  File "/*****/python3.10/site-packages/ragas/llms/base.py", line 93, in generate
    return await agenerate_text_with_retry(
  File "/*****/python3.10/site-packages/tenacity/asyncio/__init__.py", line 189, in async_wrapped
    return await copy(fn, *args, **kwargs)
  File "/*****/python3.10/site-packages/tenacity/asyncio/__init__.py", line 111, in __call__
    do = await self.iter(retry_state=retry_state)
  File "/*****/python3.10/site-packages/tenacity/asyncio/__init__.py", line 153, in iter
    result = await action(retry_state)
  File "/*****/python3.10/site-packages/tenacity/_utils.py", line 99, in inner
    return call(*args, **kwargs)
  File "/*****/python3.10/site-packages/tenacity/__init__.py", line 398, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
  File "/*****/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/*****/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/*****/python3.10/site-packages/tenacity/asyncio/__init__.py", line 114, in __call__
    result = await fn(*args, **kwargs)
  File "/*****/python3.10/site-packages/ragas/llms/base.py", line 170, in agenerate_text
    return await self.langchain_llm.agenerate_prompt(
  File "/*****/python3.10/site-packages/langchain_core/language_models/chat_models.py", line 708, in agenerate_prompt
    return await self.agenerate(
  File "/*****/python3.10/site-packages/langchain_core/language_models/chat_models.py", line 634, in agenerate
    results = await asyncio.gather(
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/*****/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/*****/python3.10/site-packages/ragas/executor.py", line 104, in wrapped_callable_async
    result = await callable(*args, **kwargs)
  File "/*****/python3.10/site-packages/ragas/metrics/base.py", line 134, in ascore
    raise e
  File "/*****/python3.10/site-packages/ragas/metrics/base.py", line 127, in ascore
    score = await asyncio.wait_for(
  File "/*****/python3.10/asyncio/tasks.py", line 458, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
Evaluating: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [01:20<00:00, 80.01s/it]
/*****/python3.10/site-packages/ragas/evaluation.py:304: RuntimeWarning: Mean of empty slice
  value = np.nanmean(self.scores[cn])
{'faithfulness': nan}

Expected behavior Expected to output the faithfulness score without any errors.

Additional context Add any other context about the problem here.

dosubot[bot] commented 1 month ago

Hey, @edgekid! I'm here to assist you with any bugs, questions, or contributions you may have. Let's dive in and tackle this issue together!

To resolve the "asyncio.exceptions.CancelledError" error when running your script using the Ragas library, you can modify the Runner class to properly handle the cancellation of tasks. Specifically, ensure that the event loop is properly closed and that any pending tasks are cancelled.

Here is a modified version of the Runner class with added handling for task cancellation:

class Runner(threading.Thread):
    def __init__(
        self,
        jobs: t.List[t.Tuple[t.Coroutine, str]],
        desc: str,
        keep_progress_bar: bool = True,
        raise_exceptions: bool = True,
        run_config: t.Optional[RunConfig] = None,
    ):
        super().__init__()
        self.jobs = jobs
        self.desc = desc
        self.keep_progress_bar = keep_progress_bar
        self.raise_exceptions = raise_exceptions
        self.run_config = run_config or RunConfig()

        # create task
        self.loop = asyncio.new_event_loop()
        self.futures = as_completed(
            loop=self.loop,
            coros=[coro for coro, _ in self.jobs],
            max_workers=self.run_config.max_workers,
        )

    async def _aresults(self) -> t.List[t.Any]:
        results = []
        for future in tqdm(
            self.futures,
            desc=self.desc,
            total=len(self.jobs),
            leave=self.keep_progress_bar,
        ):
            r = (-1, np.nan)
            try:
                r = await future
            except MaxRetriesExceeded as e:
                logger.warning(f"max retries exceeded for {e.evolution}")
            except asyncio.CancelledError:
                logger.warning("Task was cancelled")
            except Exception as e:
                if self.raise_exceptions:
                    raise e
                else:
                    logger.error(
                        "Runner in Executor raised an exception", exc_info=True
                    )
            results.append(r)

        return results

    def run(self):
        results = []
        try:
            results = self.loop.run_until_complete(self._aresults())
        except asyncio.CancelledError:
            logger.warning("Event loop was cancelled")
        finally:
            self.results = results
            self.loop.run_until_complete(self.loop.shutdown_asyncgens())
            self.loop.close()

In this updated version, the asyncio.CancelledError is caught and logged in the _aresults method. Additionally, in the run method, the event loop is properly closed by calling self.loop.shutdown_asyncgens() and self.loop.close() in the finally block. This ensures that any pending tasks are properly cancelled and the event loop is cleanly shut down [1].

Alternatively, you can use the run_async_tasks function provided in the Ragas library's async_utils.py to handle running async tasks with proper error handling and optional progress bar support:

from ragas.async_utils import run_async_tasks

tasks = [coro for coro, _ in jobs]
results = run_async_tasks(tasks, show_progress=True, progress_bar_desc="Running async tasks")

This function will handle the event loop and task execution, including proper handling of asyncio.CancelledError and other exceptions [2].

Additionally, ensure that you have proper rate limiting and retry mechanisms in place when interacting with the OpenAI API to avoid HTTPStatusError: Client error '429 Too Many Requests'. You can configure these settings using the RunConfig class:

@dataclass
class RunConfig:
    timeout: int = 60
    max_retries: int = 10
    max_wait: int = 60
    max_workers: int = 16
    thread_timeout: float = 80.0
    exception_types: t.Union[
        t.Type[BaseException],
        t.Tuple[t.Type[BaseException], ...],
    ] = (Exception,)
    log_tenacity: bool = False

Ensure that the timeout and max_retries attributes are set appropriately to handle rate limiting issues [3][4][5].

To continue talking to Dosu, mention @dosu.

jyotsanaSingh-git commented 1 month ago

Hi @edgekid were you able to solve this?

jjmachan commented 1 month ago

@edgekid @jyotsanaSingh-git can you keep an eye on #1156 will put up a fix there. The issue is the default timeouts

btw which models are you using?