explodinggradients / ragas

Supercharge Your LLM Application Evaluations 🚀
https://docs.ragas.io
Apache License 2.0
7.24k stars 743 forks source link

How to turn off asynchronous when run local LLM and Embeddings v2 #1254

Open minglong-huang opened 2 months ago

minglong-huang commented 2 months ago

Here is my code:

import typing as t
import asyncio
from typing import List
from datasets import load_dataset, load_from_disk
from ragas.metrics import faithfulness, context_recall, context_precision
from ragas.metrics import AnswerRelevancy
from ragas import evaluate
from ragas.llms import BaseRagasLLM
from langchain.schema import LLMResult
from langchain.schema import Generation
from langchain.callbacks.base import Callbacks
from langchain.schema.embeddings import Embeddings
from transformers import AutoModel, AutoTokenizer
from ragas.llms.prompt import PromptValue
from llama_index.llms.ollama import Ollama
from llama_index.core import Settings
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from FlagEmbedding import FlagModel
from FlagEmbedding import BGEM3FlagModel
from ragas.metrics import answer_relevancy
from langchain_core.language_models import BaseLanguageModel
from langchain_core.embeddings import Embeddings
from ragas.llms import BaseRagasLLM, LangchainLLMWrapper
from ragas.embeddings import BaseRagasEmbeddings
import asyncio
import traceback
from datasets import Dataset
from ragas.embeddings import LangchainEmbeddingsWrapper
import torch
from ragas.run_config import RunConfig, add_async_retry, add_retry
from abc import ABC

class MyLLM(BaseRagasLLM):

    def __init__(self,llm_path):
        self.tokenizer = AutoTokenizer.from_pretrained(llm_path, trust_remote_code=True)
        self.base_llm = AutoModel.from_pretrained(llm_path, trust_remote_code=True)
        self.base_llm = self.base_llm
        self.base_llm = self.base_llm.eval()

    @property
    def llm(self):
        return self.base_llm

    def get_llm_result(self, prompt):
        generations = []
        llm_output = {}
        token_total = 0
        content = prompt.to_string()

        print(content)
        text, history = self.base_llm.chat(self.tokenizer, content, history=[])
        print(f'*'*15)
        print(("Generated text: %s", text))
        generations.append([Generation(text=text)])
        token_total += len(text)
        llm_output['token_total'] = token_total
        return LLMResult(generations=generations, llm_output=llm_output)

    def generate_text(
            self,
            prompt: PromptValue,
            n: int = 1,
            temperature: float = 1e-8,
            stop: t.Optional[t.List[str]] = None,
            callbacks: Callbacks = [],
    ):
        print(f'runing generate_text function...')
        result = self.get_llm_result(prompt)
        return result

    async def agenerate_text(
            self,
            prompt: PromptValue,
            n: int = 1,
            temperature: float = 1e-8,
            stop: t.Optional[t.List[str]] = None,
            callbacks: Callbacks = [],
    ) -> LLMResult:
        generations = []
        llm_output = {}
        token_total = 0
        content = prompt.to_string()
        try:
            # text, history = await asyncio.get_event_loop().run_in_executor(None, self.base_llm.chat, self.tokenizer,
            #                                                            content, [])

            text, history = await asyncio.wait_for(
                asyncio.get_event_loop().run_in_executor(None, self.base_llm.chat, self.tokenizer,content),
                timeout=42  # 例如,设置超时时间为60秒
            )
        except asyncio.TimeoutError:
            print("操作超时,请检查代码或增加超时时间")
        except asyncio.CancelledError:
            print("任务被取消,请检查代码")
            info = traceback.format_exc()
            print(f"info ={info}")
        except Exception as e:
            print(f"发生未知错误:{e}")

        generations.append([Generation(text=text)])
        token_total += len(text)
        llm_output['token_total'] = token_total
        result = LLMResult(generations=generations, llm_output=llm_output)
        return result

    async def generate(
            self,
            prompt: PromptValue,
            n: int = 1,
            temperature: t.Optional[float] = None,
            stop: t.Optional[t.List[str]] = None,
            callbacks: Callbacks = None,
            is_async: bool = True,
    ) -> LLMResult:
        if temperature is None:
            temperature = 1e-8
        if is_async:
            return await self.agenerate_text(prompt, n, temperature, stop, callbacks)
        else:
            return self.generate_text(prompt, n, temperature, stop, callbacks)

class TestEmbedding(Embeddings, ABC):
    run_config: RunConfig
    def __init__(self,model_path):
        self.embed_texts = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)

    def embed_text(self, text: str) -> List[float]:
        embs = self.embed_texts([text])
        return embs[0]

    def embed_texts(self, texts: List[str]) -> t.List[t.List[float]]:

        # loop = asyncio.get_event_loop()
        embed_documents_with_retry = add_retry(
            self.embed_documents, self.run_config
        )
        return embed_documents_with_retry(texts)

    async def aembed_text(self, text: str, is_async=True) -> List[float]:
        embs = await self.embed_texts([text], is_async=False)
        return embs[0]

    async def aembed_texts(
        self, texts: List[str], is_async: bool = True
    ) -> t.List[t.List[float]]:
        if is_async:
            aembed_documents_with_retry = add_async_retry(
                self.aembed_documents, self.run_config
            )
            return await aembed_documents_with_retry(texts)
        else:
            loop = asyncio.get_event_loop()
            embed_documents_with_retry = add_retry(
                self.embed_documents, self.run_config
            )
            return await loop.run_in_executor(None, embed_documents_with_retry, texts)

    def set_run_config(self, run_config: RunConfig):
        self.run_config = run_config

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        return self.embed_texts.encode_corpus(texts, self.batch_size, self.max_length).tolist()

    def embed_query(self, text: str) -> List[float]:
        return self.embed_texts.encode_queries(text, self.batch_size, self.max_length).tolist()
#数据
data_path = "/home/kelvin/nlp/graphrag/eval_dataset/amnesty_qa"
amnesty_qa = load_dataset("/home/kelvin/nlp/graphrag/eval_dataset/amnesty_qa")

# MODEL_PATH = '/home/kelvin/nlp/model/LLM/THUDM/glm-4-9b-chat'
# MODEL_PATH = '/home/kelvin/nlp/model/LLM/Qwen/Qwen1.5-32B'
MODEL_PATH = '/home/kelvin/nlp/model/LLM/THUDM/glm-4-9b-chat'

embed_model_path = '/home/kelvin/nlp/model/Embedding/BAAI/bge-m3'
# embedding_model = MyEmbedding(embed_model_path)
embedding_model = TestEmbedding(embed_model_path)
my_llm = MyLLM(MODEL_PATH)

# Wrap the custom LLM and Embeddings
# my_llm = LangchainLLMWrapper(my_llm)
# embedding_model = LangchainEmbeddingsWrapper(embedding_model)

ans_relevancy = AnswerRelevancy()

data_samples = {
    'question': ['When was the first super bowl?', 'Who won the most super bowls?'],
    'answer': ['The first superbowl was held on Jan 15, 1967', 'The most super bowls have been won by The New England Patriots'],
    'contexts' : [['The First AFL–NFL World Championship Game was an American football game played on January 15, 1967, at the Los Angeles Memorial Coliseum in Los Angeles,'],
    ['The Green Bay Packers...Green Bay, Wisconsin.','The Packers compete...Football Conference']],
    'ground_truth': ['The first superbowl was held on January 15, 1967', 'The New England Patriots have won the Super Bowl a record six times']
}

dataset = Dataset.from_dict(data_samples)
# amnesty_qa["eval"],
result = evaluate(
    dataset,
    metrics=[context_recall, context_precision, ans_relevancy, faithfulness],
    llm=my_llm,
    embeddings=embedding_model,
    raise_exceptions=True,
    is_async=False
)

df = result.to_pandas()
print(df.head())
df.to_csv("result.csv", index=False)

# print(result)

and i have set is_async=False ,but it still run async def agenerate_text( self, prompt: PromptValue, n: int = 1, temperature: float = 1e-8, stop: t.Optional[t.List[str]] = None, callbacks: Callbacks = [], ) .

and then report a error

Traceback (most recent call last):
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/threading.py", line 1009, in _bootstrap_inner
    self.run()
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/ragas/executor.py", line 75, in run
    results = self.loop.run_until_complete(self._aresults())
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/asyncio/base_events.py", line 641, in run_until_complete
    return future.result()
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/ragas/executor.py", line 63, in _aresults
    raise e
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/ragas/executor.py", line 58, in _aresults
    r = await future
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/asyncio/tasks.py", line 575, in _wait_for_one
    return f.result()  # May raise f.exception().
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/ragas/executor.py", line 91, in wrapped_callable_async
    return counter, await callable(*args, **kwargs)
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/ragas/metrics/base.py", line 91, in ascore
    raise e
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/ragas/metrics/base.py", line 87, in ascore
    score = await self._ascore(row=row, callbacks=group_cm, is_async=is_async)
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/ragas/metrics/_context_recall.py", line 113, in _ascore
    result = await self.llm.generate(
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/ragas/llms/base.py", line 92, in generate
    return await agenerate_text_with_retry(
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 189, in async_wrapped
    return await copy(fn, *args, **kwargs)
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 111, in __call__
    do = await self.iter(retry_state=retry_state)
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 153, in iter
    result = await action(retry_state)
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/tenacity/_utils.py", line 99, in inner
    return call(*args, **kwargs)
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/tenacity/__init__.py", line 418, in exc_check
    raise retry_exc.reraise()
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/tenacity/__init__.py", line 185, in reraise
    raise self.last_attempt.result()
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/concurrent/futures/_base.py", line 438, in result
    return self.__get_result()
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 114, in __call__
    result = await fn(*args, **kwargs)
  File "/home/kelvin/nlp/graphrag/eval.py", line 106, in agenerate_text
    generations.append([Generation(text=text)])
UnboundLocalError: local variable 'text' referenced before assignment
Traceback (most recent call last):
  File "/home/kelvin/nlp/graphrag/eval.py", line 205, in <module>
    result = evaluate(
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/ragas/evaluation.py", line 231, in evaluate
    raise e
  File "/home/kelvin/anaconda3/envs/nlp/lib/python3.10/site-packages/ragas/evaluation.py", line 213, in evaluate
    raise ExceptionInRunner()
ragas.exceptions.ExceptionInRunner: The runner thread which was running the jobs raised an exeception. Read the traceback above to debug it. You can also pass `raise_exception=False` incase you want to show only a warning message instead.
dosubot[bot] commented 2 months ago

I found a similar open issue that addresses turning off asynchronous execution when running a local LLM and Embeddings. The solution involves setting the is_async parameter to False in the generate method call of your LLM class and ensuring that the embed_texts method in your Embeddings class also handles the is_async parameter correctly.

Here is the modified code snippet for your MyLLM class:

class MyLLM(BaseRagasLLM):

    def __init__(self, llm_path):
        self.tokenizer = AutoTokenizer.from_pretrained(llm_path, trust_remote_code=True)
        self.base_llm = AutoModel.from_pretrained(llm_path, trust_remote_code=True)
        self.base_llm = self.base_llm.eval()

    @property
    def llm(self):
        return self.base_llm

    def get_llm_result(self, prompt):
        generations = []
        llm_output = {}
        token_total = 0
        content = prompt.to_string()

        print(content)
        text, history = self.base_llm.chat(self.tokenizer, content, history=[])
        print(f'*'*15)
        print(("Generated text: %s", text))
        generations.append([Generation(text=text)])
        token_total += len(text)
        llm_output['token_total'] = token_total
        return LLMResult(generations=generations, llm_output=llm_output)

    def generate_text(
            self,
            prompt: PromptValue,
            n: int = 1,
            temperature: float = 1e-8,
            stop: t.Optional[t.List[str]] = None,
            callbacks: Callbacks = [],
    ):
        print(f'Running generate_text function...')
        result = self.get_llm_result(prompt)
        return result

    async def agenerate_text(
            self,
            prompt: PromptValue,
            n: int = 1,
            temperature: float = 1e-8,
            stop: t.Optional[t.List[str]] = None,
            callbacks: Callbacks = [],
    ) -> LLMResult:
        generations = []
        llm_output = {}
        token_total = 0
        content = prompt.to_string()
        try:
            text, history = await asyncio.wait_for(
                asyncio.get_event_loop().run_in_executor(None, self.base_llm.chat, self.tokenizer, content),
                timeout=42
            )
        except asyncio.TimeoutError:
            print("Operation timed out, please check the code or increase the timeout duration")
            text = ""
        except asyncio.CancelledError:
            print("Task was cancelled, please check the code")
            info = traceback.format_exc()
            print(f"info = {info}")
            text = ""
        except Exception as e:
            print(f"An unknown error occurred: {e}")
            text = ""

        generations.append([Generation(text=text)])
        token_total += len(text)
        llm_output['token_total'] = token_total
        result = LLMResult(generations=generations, llm_output=llm_output)
        return result

    async def generate(
            self,
            prompt: PromptValue,
            n: int = 1,
            temperature: t.Optional[float] = None,
            stop: t.Optional[t.List[str]] = None,
            callbacks: Callbacks = None,
            is_async: bool = True,
    ) -> LLMResult:
        if temperature is None:
            temperature = 1e-8
        if is_async:
            return await self.agenerate_text(prompt, n, temperature, stop, callbacks)
        else:
            return self.generate_text(prompt, n, temperature, stop, callbacks)

For the TestEmbedding class, ensure you set is_async=False when calling the embed_texts method:

class TestEmbedding(Embeddings, ABC):
    run_config: RunConfig

    def __init__(self, model_path):
        self.embed_texts = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)

    def embed_text(self, text: str) -> List[float]:
        embs = self.embed_texts([text])
        return embs[0]

    def embed_texts(self, texts: List[str]) -> t.List[t.List[float]]:
        embed_documents_with_retry = add_retry(
            self.embed_documents, self.run_config
        )
        return embed_documents_with_retry(texts)

    async def aembed_text(self, text: str, is_async=True) -> List[float]:
        embs = await self.embed_texts([text], is_async=False)
        return embs[0]

    async def aembed_texts(
        self, texts: List[str], is_async: bool = True
    ) -> t.List[t.List[float]]:
        if is_async:
            aembed_documents_with_retry = add_async_retry(
                self.aembed_documents, self.run_config
            )
            return await aembed_documents_with_retry(texts)
        else:
            loop = asyncio.get_event_loop()
            embed_documents_with_retry = add_retry(
                self.embed_documents, self.run_config
            )
            return await loop.run_in_executor(None, embed_documents_with_retry, texts)

    def set_run_config(self, run_config: RunConfig):
        self.run_config = run_config

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        return self.embed_texts.encode_corpus(texts, self.batch_size, self.max_length).tolist()

    def embed_query(self, text: str) -> List[float]:
        return self.embed_texts.encode_queries(text, self.batch_size, self.max_length).tolist()

By setting is_async=False, you ensure that the execution will be synchronous [1].

To continue talking to Dosu, mention @dosu.