run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.21k stars 5.16k forks source link

[Question]: How to add memory or cache into a packed query pipeline? #10103

Closed xjw1001002 closed 9 months ago

xjw1001002 commented 9 months ago

Question Validation

Question

Hi all,

I am looking to build a RAG chatbot with advanced query pipeline. So how to add previous conversation memory into a packed query pipeline or queryengine?

dosubot[bot] commented 9 months ago

🤖

Hey @xjw1001002!

I'm Dosu, a friendly bot here to assist you with any questions and issues you might have concerning LlamaIndex. I'll do my best to provide useful solutions while you're waiting for a human maintainer. Don't hesitate to ask anything!

To add previous conversation memory into a packed query pipeline or query engine for a RAG chatbot with an advanced query pipeline, you can follow the approach suggested in a similar issue in the LlamaIndex repository.

You can initialize a chat engine/agent manually with the appropriate class. Here's the code snippet provided:

from llama_index.agent import OpenAIAgent
from llama_index.tools import QueryEngineTool

tool = QueryEngineTool.from_defaults(sub_question_query_engine, name="query", description="Useful for looking up information about X")
agent = OpenAIAgent.from_tools([tool], verbose=True)
agent.chat("Tell me something about X")

This approach will use the chat history to rewrite the initial question to the query engine, and all question generation, RAG, and final answer synthesis will be based on the question that was re-written using the chat history. You can find more details in this issue.

Additionally, if you need to access the generated sub-questions and their answers, the sub question engine has its own top-level response step that synthesizes a final response, and the available sub-responses are in the response.source_nodes attribute. You could create a custom tool to expose this to the agent, or consider giving the agent access to the sub-indexes directly.

For a more advanced approach, you can use the deep memory feature of the LlamaIndex. This involves generating questions and relevance for your training and test datasets, training the deep memory model, evaluating its performance, and creating a query engine with deep memory. You can find more details in this notebook.

The OpenAIAgent class in the LlamaIndex codebase is a key component that manages the interaction between the user, the language model (LLM), and the tools (like QueryEngineTool). It is responsible for handling the chat history, calling functions on tools, and managing the responses from the LLM and tools. You can find more details in the source code.

I hope this helps! If you have any more questions, feel free to ask.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

xjw1001002 commented 9 months ago

@dosu-bot Suppose I already have the pipeline:

class RAGFusionPipelinePack(BaseLlamaPack): """RAG Fusion pipeline.

Create a bunch of vector indexes of different chunk sizes.

> Suppose the fusionRAG DB has already been created by ingest_md_fusionRAG.py
"""

def __init__(
    self,
    #documents: List[Document],
    path: str = "Vector_DB_storage/fusionRAG",
    embed_model = None,
    llm: Optional[AzureOpenAI] = None,
    llm_new: Optional[AzureOpenAI] = None,
    chunk_sizes: Optional[List[int]] = None,
) -> None:
    """Init params."""
    #self.documents = documents
    self.chunk_sizes = chunk_sizes or DEFAULT_CHUNK_SIZES

    # construct index
    self.llm = llm or OpenAI(model="gpt-3.5-turbo")
    self.llm_new = llm_new or OpenAI(model="gpt-3.5-turbo")
    self.embed_model = embed_model
    self.query_engines = []
    self.retrievers = {}
    self.path = path
    for chunk_size in self.chunk_sizes:

        service_context = ServiceContext.from_defaults(llm=self.llm, embed_model = self.embed_model)
        persist_dir = os.path.join(self.path, str(chunk_size))
        storage_context = StorageContext.from_defaults(persist_dir=persist_dir)
        vector_index = load_index_from_storage(storage_context, service_context=service_context)
        self.query_engines.append(vector_index.as_query_engine())
        self.retrievers[str(chunk_size)] = vector_index.as_retriever()
        if chunk_size == max(self.chunk_sizes):
            self.retrievers["bm25"] = BM25Retriever.from_defaults(
                docstore=vector_index.docstore, similarity_top_k=3
            )

    # define rerank component
    rerank_component = FnComponent(fn=reciprocal_rank_fusion)

    # construct query pipeline
    p = QueryPipeline(verbose = False)
    module_dict = {
        **self.retrievers,
        "input": InputComponent(),
        "summarizer": TreeSummarize(),
        # NOTE: Join args
        "join": ArgPackComponent(),
        "reranker": rerank_component,
    }
    p.add_modules(module_dict)
    # add links from input to retriever (id'ed by chunk_size)

    for chunk_size in self.chunk_sizes:
        p.add_link("input", str(chunk_size))
        p.add_link(str(chunk_size), "join", dest_key=str(chunk_size))

    p.add_link("input", "bm25")
    p.add_link("bm25", "join", dest_key="bm25")

    p.add_link("join", "reranker")
    p.add_link("input", "summarizer", dest_key="query_str")
    p.add_link("reranker", "summarizer", dest_key="nodes")

    self.query_pipeline = p

def get_modules(self) -> Dict[str, Any]:
    """Get modules."""
    return {
        "llm": self.llm,
        "retrievers": self.retrievers,
        "query_engines": self.query_engines,
        "query_pipeline": self.query_pipeline,
    }

def run(self, *args: Any, **kwargs: Any) -> Any:
    """Run the pipeline."""
    return self.query_pipeline.run(*args, **kwargs)

And I use it as response_raw = pack.run(query=query), how can I add history to it?

dosubot[bot] commented 9 months ago

🤖

To incorporate previous conversation memory into your packed query pipeline, you can modify your run method to accept an additional parameter for the conversation history. This history can then be included in the query that is passed to the pipeline. Here's an example of how you can do this:

def run(self, *args: Any, history: Optional[str] = None, **kwargs: Any) -> Any:
    """Run the pipeline."""
    if history:
        kwargs["query"] = history + kwargs["query"]
    return self.query_pipeline.run(*args, **kwargs)

In this example, history is a string that contains the previous conversation history. It is prepended to the current query before the query is passed to the pipeline. This allows the pipeline to consider the conversation history when processing the query.

You can then use this modified run method like this:

history = "Previous conversation history."
response_raw = pack.run(query=query, history=history)

This will include the conversation history in the query that is processed by the pipeline.

Please note that this is a simple example and may not work perfectly for your use case. Depending on how your pipeline is set up, you may need to modify this approach to better suit your needs. For example, you might need to format the history and the query in a specific way for them to be properly processed by the pipeline.

I hope this helps! If you have any further questions, please let me know.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

logan-markewich commented 9 months ago

@xjw1001002 it's probably helpful to think about how you would want this to work.

It's not currently supported, but I'm wondering what the flow would look like ideally here?

logan-markewich commented 9 months ago

(Or at least, it could be supported if you wrote a custom component)

xjw1001002 commented 9 months ago

(Or at least, it could be supported if you wrote a custom component)

The second part of bot reply solved my question. And for complex history, probably we can use langchain memory or counterpart modules.

logan-markewich commented 9 months ago

Sounds good!