Open arunnuve opened 1 month ago
Hello @arunnuve! I'm here to help you with the issue you're facing. If you need assistance, feel free to ask.
To resolve the JSON and Key co-routine errors when using the SubQuestionQueryEngine
with the model microsoft/Phi-3-mini-4k-instruct
in a Retrieval-Augmented Generation (RAG) setup with llama-index
0.10.57, you need to ensure that the asynchronous methods are correctly implemented and that the JSON payloads are properly handled.
Here are the key points to check and correct:
Ensure Asynchronous Methods are Correctly Implemented:
aquery
method of the query engine is correctly awaited.asyncio.gather
is used properly to handle multiple asynchronous tasks.Proper Handling of JSON Payloads:
Here is a snippet of the relevant code with corrections:
async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
with self.callback_manager.event(
CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
) as query_event:
sub_questions = await self._question_gen.agenerate(
self._metadatas, query_bundle
)
colors = get_color_mapping([str(i) for i in range(len(sub_questions))])
if self._verbose:
print_text(f"Generated {len(sub_questions)} sub questions.\n")
tasks = [
self._aquery_subq(sub_q, color=colors[str(ind)])
for ind, sub_q in enumerate(sub_questions)
]
qa_pairs_all = await asyncio.gather(*tasks)
qa_pairs_all = cast(List[Optional[SubQuestionAnswerPair]], qa_pairs_all)
# filter out sub questions that failed
qa_pairs: List[SubQuestionAnswerPair] = list(filter(None, qa_pairs_all))
nodes = [self._construct_node(pair) for pair in qa_pairs]
source_nodes = [node for qa_pair in qa_pairs for node in qa_pair.sources]
response = await self._response_synthesizer.asynthesize(
query=query_bundle,
nodes=nodes,
additional_source_nodes=source_nodes,
)
query_event.on_end(payload={EventPayload.RESPONSE: response})
return response
async def _aquery_subq(
self, sub_q: SubQuestion, color: Optional[str] = None
) -> Optional[SubQuestionAnswerPair]:
try:
with self.callback_manager.event(
CBEventType.SUB_QUESTION,
payload={EventPayload.SUB_QUESTION: SubQuestionAnswerPair(sub_q=sub_q)},
) as event:
question = sub_q.sub_question
query_engine = self._query_engines[sub_q.tool_name]
if self._verbose:
print_text(f"[{sub_q.tool_name}] Q: {question}\n", color=color)
response = await query_engine.aquery(question)
response_text = str(response)
if self._verbose:
print_text(f"[{sub_q.tool_name}] A: {response_text}\n", color=color)
qa_pair = SubQuestionAnswerPair(
sub_q=sub_q, answer=response_text, sources=response.source_nodes
)
event.on_end(payload={EventPayload.SUB_QUESTION: qa_pair})
return qa_pair
except ValueError:
logger.warning(f"[{sub_q.tool_name}] Failed to run {question}")
return None
Ensure that the query_engine.aquery
method is correctly awaited and that the JSON payloads are correctly structured. This should help resolve the JSON and Key co-routine errors in your setup [1][2].
To continue talking to Dosu, mention @dosu.
To further address the JSON and Key co-routine errors, let's ensure the following:
Validate JSON Output:
Check for Proper Event Handling:
Run Asynchronous Tasks Correctly:
asyncio.gather
to run asynchronous tasks and ensure they are awaited.Here is an updated version of the code with additional error handling for JSON parsing:
from json import JSONDecodeError
async def _aquery_subq(
self, sub_q: SubQuestion, color: Optional[str] = None
) -> Optional[SubQuestionAnswerPair]:
try:
with self.callback_manager.event(
CBEventType.SUB_QUESTION,
payload={EventPayload.SUB_QUESTION: SubQuestionAnswerPair(sub_q=sub_q)},
) as event:
question = sub_q.sub_question
query_engine = self._query_engines[sub_q.tool_name]
if self._verbose:
print_text(f"[{sub_q.tool_name}] Q: {question}\n", color=color)
response = await query_engine.aquery(question)
response_text = str(response)
try:
response_json = json.loads(response_text)
except JSONDecodeError:
logger.warning(f"Invalid JSON from LLM: {response_text}")
return None
if self._verbose:
print_text(f"[{sub_q.tool_name}] A: {response_text}\n", color=color)
qa_pair = SubQuestionAnswerPair(
sub_q=sub_q, answer=response_text, sources=response.source_nodes
)
event.on_end(payload={EventPayload.SUB_QUESTION: qa_pair})
return qa_pair
except ValueError:
logger.warning(f"[{sub_q.tool_name}] Failed to run {question}")
return None
async def _aquery(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
with self.callback_manager.event(
CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
) as query_event:
sub_questions = await self._question_gen.agenerate(
self._metadatas, query_bundle
)
colors = get_color_mapping([str(i) for i in range(len(sub_questions))])
if self._verbose:
print_text(f"Generated {len(sub_questions)} sub questions.\n")
tasks = [
self._aquery_subq(sub_q, color=colors[str(ind)])
for ind, sub_q in enumerate(sub_questions)
]
qa_pairs_all = await asyncio.gather(*tasks)
qa_pairs_all = cast(List[Optional[SubQuestionAnswerPair]], qa_pairs_all)
# filter out sub questions that failed
qa_pairs: List[SubQuestionAnswerPair> = list(filter(None, qa_pairs_all))
nodes = [self._construct_node(pair) for pair in qa_pairs]
source_nodes = [node for qa_pair in qa_pairs for node in qa_pair.sources]
response = await self._response_synthesizer.asynthesize(
query=query_bundle,
nodes=nodes,
additional_source_nodes=source_nodes,
)
query_event.on_end(payload={EventPayload.RESPONSE: response})
return response
Additionally, ensure that the run_async_tasks
function is used correctly when self._use_async
is set to True
:
if self._use_async:
tasks = [
self._aquery_subq(sub_q, color=colors[str(ind)])
for ind, sub_q in enumerate(sub_questions)
]
qa_pairs_all = run_async_tasks(tasks)
qa_pairs_all = cast(List[Optional[SubQuestionAnswerPair]], qa_pairs_all)
else:
qa_pairs_all = [
self._query_subq(sub_q, color=colors[str(ind)])
for ind, sub_q in enumerate(sub_questions)
]
# filter out sub questions that failed
qa_pairs: List[SubQuestionAnswerPair> = list(filter(None, qa_pairs_all))
nodes = [self._construct_node(pair) for pair in qa_pairs]
source_nodes = [node for qa_pair in qa_pairs for node in qa_pair.sources]
response = self._response_synthesizer.synthesize(
query=query_bundle,
nodes=nodes,
additional_source_nodes=source_nodes,
)
query_event.on_end(payload={EventPayload.RESPONSE: response})
return response
By ensuring that all asynchronous calls are properly awaited and JSON payloads are correctly structured, you should be able to resolve the errors you are encountering [1][2].
To continue talking to Dosu, mention @dosu.
@dosu Still I face the same issue. The snippet of code with correction your provided is same as my code.I'm append the screenshot here.
Screenshot breakdown: AAD,TOLMAR are 2 PDFs. I ingested more than 30 similar PDFs to do RAG. In this case, the answer need to take from PDF AAD, but for processing it tooks TOLMAR as PDF. For processing the query, it took correct tool(PDF), but it misleads before getting final answer it took wrong PDF as tool.
To ensure that the correct PDF is always selected for processing the query in the SubQuestionQueryEngine
using the microsoft/Phi-3-mini-4k-instruct
model in a Retrieval-Augmented Generation (RAG) setup with llama-index
version 0.10.57, you need to properly configure the QueryEngineTool
instances with metadata that accurately describes the content and purpose of each PDF. This metadata will help the SubQuestionQueryEngine
to select the appropriate tool for the query.
Here is an example of how to set up the QueryEngineTool
instances and the SubQuestionQueryEngine
:
from llama_index.core.tools import QueryEngineTool, ToolMetadata
from llama_index.core.query_engine import SubQuestionQueryEngine
from llama_index.readers.file.docs import PDFReader
from llama_index.packs.rag_fusion_query_pipeline import RAGFusionPipelinePack
from llama_index.llms.openai import OpenAI
from pathlib import Path
# Load documents
loader = PDFReader()
document_path = Path("./data/101.pdf") # replace with your own document
documents = loader.load_data(file=document_path)
# Create the pack
pack = RAGFusionPipelinePack(documents, llm=OpenAI(model="gpt-3.5-turbo"))
# Assuming vector_index and summary_index are already created from the documents
vector_tool = QueryEngineTool(
vector_index.as_query_engine(),
metadata=ToolMetadata(
name="vector_search",
description="Useful for searching for specific facts in the PDF.",
),
)
summary_tool = QueryEngineTool(
summary_index.as_query_engine(response_mode="tree_summarize"),
metadata=ToolMetadata(
name="summary",
description="Useful for summarizing the entire PDF document.",
),
)
# Apply nest_asyncio to avoid event loop issues
import nest_asyncio
nest_asyncio.apply()
# Create the SubQuestionQueryEngine with the tools
query_engine = SubQuestionQueryEngine.from_defaults(
[vector_tool, summary_tool],
verbose=True,
)
# Example query
response = query_engine.query(
"What was mentioned about Meta? How Does it differ from how OpenAI is talked about?"
)
# Display the response
from llama_index.core.response.notebook_utils import display_response
display_response(response)
By setting the metadata
for each QueryEngineTool
, you provide the SubQuestionQueryEngine
with the necessary information to select the correct PDF for processing the query. Additionally, loading the documents using PDFReader
and creating a RAGFusionPipelinePack
ensures that the documents are correctly processed and integrated into the query pipeline [1][2].
For a more comprehensive setup, you can follow the steps outlined in the Advanced RAG demo using KDB.AI, which includes loading data, chunking, encoding, storing into a vector store, and creating a query engine:
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.vector_stores.kdbai import KDBAIVectorStore
import kdbai_client as kdbai
# Load the data
loader = SimpleDirectoryReader(input_dir="./data")
documents = loader.load_data()
# Chunk, encode, and store into a vector store
session = kdbai.Session(endpoint="http://localhost:8082")
vector_store = KDBAIVectorStore(session.create_table("test_store", schema={
"columns": [
{"name": "document_id", "pytype": "bytes"},
{"name": "text", "pytype": "bytes"},
{"name": "embedding", "vectorIndex": {"type": "flat", "metric": "L2", "dims": 768}},
{"name": "title", "pytype": "bytes"},
{"name": "publication_date", "pytype": "datetime64[ns]"},
]
}))
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
HuggingFaceEmbedding(model_name="sentence-transformers/all-mpnet-base-v2"),
],
vector_store=vector_store,
)
_nodes = pipeline.run(documents=documents, num_workers=4)
# Create a VectorStoreIndex
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
# Create a query engine
query_engine = index.as_query_engine()
# Example query
response = query_engine.query("What is DoRA?")
print(response)
This setup ensures that your documents are properly processed and indexed, allowing the SubQuestionQueryEngine
to effectively retrieve and process the correct PDF for any given query [3].
By following these stages and properly configuring your tools and metadata, you can ensure that the correct PDF is always selected for processing the query.
To continue talking to Dosu, mention @dosu.
@logan-markewich Hi Can you provide a solution for this.
@dosu Thanks but, when i selected same PDF and ask another question i got the answer. So the issue is not about setting metadata and the tool. See this is the response, when i printed the tool and its description: ==>The corresponding protocol that is taken from input query: AAD
==>The corresponding engine that is taken from input query: <llama_index.core.query_engine.retriever_query_engine.RetrieverQueryEngine object at 0x7ce6994aa860>
==>The corresponding metadata :ToolMetadata(description='Provides information about AAD', name='AAD', fn_schema=<class 'llama_index.core.tools.types.DefaultToolFnSchema'>, return_direct=False) Up to this are correct but before getting the answer : It took another tool as subquestionengine tool, that causing the Error.
To ensure that the SubQuestionQueryEngine
consistently uses the correct tool throughout the entire query processing and avoids the KeyError
, you need to make sure that the tool_name
used in sub_q
is always present in the _query_engines
dictionary. This can be done by adding a check before accessing the dictionary.
Here is the modified code with the necessary check:
async def _aquery_subq(
self, sub_q: SubQuestion, color: Optional[str] = None
) -> Optional[SubQuestionAnswerPair]:
try:
with self.callback_manager.event(
CBEventType.SUB_QUESTION,
payload={EventPayload.SUB_QUESTION: SubQuestionAnswerPair(sub_q=sub_q)},
) as event:
question = sub_q.sub_question
if sub_q.tool_name not in self._query_engines:
raise KeyError(f"Tool {sub_q.tool_name} not found in query engines.")
query_engine = self._query_engines[sub_q.tool_name]
if self._verbose:
print_text(f"[{sub_q.tool_name}] Q: {question}\n", color=color)
response = await query_engine.aquery(question)
response_text = str(response)
if self._verbose:
print_text(f"[{sub_q.tool_name}] A: {response_text}\n", color=color)
qa_pair = SubQuestionAnswerPair(
sub_q=sub_q, answer=response_text, sources=response.source_nodes
)
event.on_end(payload={EventPayload.SUB_QUESTION: qa_pair})
return qa_pair
except (ValueError, KeyError) as e:
logger.warning(f"[{sub_q.tool_name}] Failed to run {question}: {e}")
return None
def _query_subq(
self, sub_q: SubQuestion, color: Optional[str] = None
) -> Optional[SubQuestionAnswerPair]:
try:
with self.callback_manager.event(
CBEventType.SUB_QUESTION,
payload={EventPayload.SUB_QUESTION: SubQuestionAnswerPair(sub_q=sub_q)},
) as event:
question = sub_q.sub_question
if sub_q.tool_name not in self._query_engines:
raise KeyError(f"Tool {sub_q.tool_name} not found in query engines.")
query_engine = self._query_engines[sub_q.tool_name]
if self._verbose:
print_text(f"[{sub_q.tool_name}] Q: {question}\n", color=color)
response = query_engine.query(question)
response_text = str(response)
if self._verbose:
print_text(f"[{sub_q.tool_name}] A: {response_text}\n", color=color)
qa_pair = SubQuestionAnswerPair(
sub_q=sub_q, answer=response_text, sources=response.source_nodes
)
event.on_end(payload={EventPayload.SUB_QUESTION: qa_pair})
return qa_pair
except (ValueError, KeyError) as e:
logger.warning(f"[{sub_q.tool_name}] Failed to run {question}: {e}")
return None
Additionally, you can ensure that the tools are correctly defined and available by using the ToolMetadata
class as shown in the test_llm_question_gen
function from the llama-index-legacy/tests/question_gen/test_llm_generators.py
file:
def test_llm_question_gen(
mock_service_context: ServiceContext,
) -> None:
question_gen = LLMQuestionGenerator.from_defaults(
service_context=mock_service_context
)
tools = [
ToolMetadata(description="data source 1", name="source_1"),
ToolMetadata(description="data source 2", name="source_2"),
]
query = QueryBundle(query_str="What is A and B?")
sub_questions = question_gen.generate(tools=tools, query=query)
assert isinstance(sub_questions[0], SubQuestion)
This ensures that the tools are properly initialized and can be used by the SubQuestionQueryEngine
without causing a KeyError
[1][2].
This is a tough nut to crack, could really use your help on this, @logan-markewich.
To continue talking to Dosu, mention @dosu.
@dosu Thanks,now its not giving me any json dict error or key coroutine error instead it giving me "Empty response", But my doubt is that when i give ask another query from that pdf it gives me the correct answer also when i tried with other open llms that support SubQueryEngine also the result is same, But when it comes to Open API I'm getting the correct answer.
To resolve the "Empty response" issue when using the OpenAI API with the SubQuestionQueryEngine
in a Retrieval-Augmented Generation (RAG) setup, you should ensure that the sub-questions are being generated and processed correctly. Here are some steps and code snippets to help you debug and resolve the issue:
Check Sub-Question Generation: Ensure that sub-questions are being generated correctly by the question_gen
module. Consider using the OpenAIQuestionGenerator
which leverages OpenAI's function calling API to improve robustness and reduce output parsing issues.
Verify Query Execution: Ensure that each sub-question is being executed by the appropriate query engine and that responses are being collected.
Debugging Output: Use verbose logging to print intermediate questions and answers to help identify where the issue might be occurring.
Here is a code snippet that includes additional logging to help debug the issue:
def _query(self, query_bundle: QueryBundle) -> RESPONSE_TYPE:
with self.callback_manager.event(
CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
) as query_event:
sub_questions = self._question_gen.generate(self._metadatas, query_bundle)
colors = get_color_mapping([str(i) for i in range(len(sub_questions))])
if self._verbose:
print_text(f"Generated {len(sub_questions)} sub questions.\n")
if self._use_async:
tasks = [
self._aquery_subq(sub_q, color=colors[str(ind)])
for ind, sub_q in enumerate(sub_questions)
]
qa_pairs_all = run_async_tasks(tasks)
qa_pairs_all = cast(List[Optional[SubQuestionAnswerPair]], qa_pairs_all)
else:
qa_pairs_all = [
self._query_subq(sub_q, color=colors[str(ind)])
for ind, sub_q in enumerate(sub_questions)
]
# filter out sub questions that failed
qa_pairs: List[SubQuestionAnswerPair] = list(filter(None, qa_pairs_all))
if not qa_pairs:
logger.warning("No sub-questions were successfully answered.")
return "Empty response"
nodes = [self._construct_node(pair) for pair in qa_pairs]
source_nodes = [node for qa_pair in qa_pairs for node in qa_pair.sources]
response = self._response_synthesizer.synthesize(
query=query_bundle,
nodes=nodes,
additional_source_nodes=source_nodes,
)
query_event.on_end(payload={EventPayload.RESPONSE: response})
return response
In this snippet, additional logging is added to warn if no sub-questions were successfully answered, which can help identify if the issue lies in the sub-question generation or execution phase.
Additionally, you can refer to the LlamaIndex documentation for building custom query engines, which provides examples of defining custom query engines for RAG setups. This can help you better understand how to structure your query engine and troubleshoot issues:
class QueryPipelineQueryEngine(CustomQueryEngine):
query_pipeline: QueryPipeline = Field(
description="Query Pipeline to use for Q&A.",
)
def custom_query(self, query_str: str) -> RESPONSE_TYPE:
return self.query_pipeline.run(query_str=query_str)
async def acustom_query(self, query_str: str) -> RESPONSE_TYPE:
return await self.query_pipeline.arun(query_str=query_str)
For more detailed examples and setup instructions, you can refer to the LlamaIndex documentation. This notebook showcases how to use OpenAI function calling to improve the robustness of the sub-question query engine and provides a step-by-step guide to implementing and testing the OpenAIQuestionGenerator
[1][2][3][4][5].
To continue talking to Dosu, mention @dosu.
@dosu The error is same, if i remove this line of code that you given : if sub_q.tool_name not in self._query_engines: raise KeyError(f"Tool {sub_q.tool_name} not found in query engines.")
To address the issue without causing a KeyError
and to ensure that the correct tool is consistently used throughout the query processing, you can implement a fallback mechanism instead of raising an error. This way, if the tool is not found, you can handle it gracefully.
Here is the updated code with a fallback mechanism:
async def _aquery_subq(
self, sub_q: SubQuestion, color: Optional[str] = None
) -> Optional[SubQuestionAnswerPair]:
try:
with self.callback_manager.event(
CBEventType.SUB_QUESTION,
payload={EventPayload.SUB_QUESTION: SubQuestionAnswerPair(sub_q=sub_q)},
) as event:
question = sub_q.sub_question
query_engine = self._query_engines.get(sub_q.tool_name)
if query_engine is None:
logger.warning(f"Tool {sub_q.tool_name} not found in query engines. Skipping this sub-question.")
return None
if self._verbose:
print_text(f"[{sub_q.tool_name}] Q: {question}\n", color=color)
response = await query_engine.aquery(question)
response_text = str(response)
if self._verbose:
print_text(f"[{sub_q.tool_name}] A: {response_text}\n", color=color)
qa_pair = SubQuestionAnswerPair(
sub_q=sub_q, answer=response_text, sources=response.source_nodes
)
event.on_end(payload={EventPayload.SUB_QUESTION: qa_pair})
return qa_pair
except ValueError as e:
logger.warning(f"[{sub_q.tool_name}] Failed to run {question}: {e}")
return None
def _query_subq(
self, sub_q: SubQuestion, color: Optional[str] = None
) -> Optional[SubQuestionAnswerPair]:
try:
with self.callback_manager.event(
CBEventType.SUB_QUESTION,
payload={EventPayload.SUB_QUESTION: SubQuestionAnswerPair(sub_q=sub_q)},
) as event:
question = sub_q.sub_question
query_engine = self._query_engines.get(sub_q.tool_name)
if query_engine is None:
logger.warning(f"Tool {sub_q.tool_name} not found in query engines. Skipping this sub-question.")
return None
if self._verbose:
print_text(f"[{sub_q.tool_name}] Q: {question}\n", color=color)
response = query_engine.query(question)
response_text = str(response)
if self._verbose:
print_text(f"[{sub_q.tool_name}] A: {response_text}\n", color=color)
qa_pair = SubQuestionAnswerPair(
sub_q=sub_q, answer=response_text, sources=response.source_nodes
)
event.on_end(payload={EventPayload.SUB_QUESTION: qa_pair})
return qa_pair
except ValueError as e:
logger.warning(f"[{sub_q.tool_name}] Failed to run {question}: {e}")
return None
This approach ensures that if a tool is not found in the _query_engines
dictionary, the sub-question is skipped, and a warning is logged instead of raising an error. This should help you avoid the KeyError
and ensure that the query processing continues smoothly.
Additionally, ensure that the SubQuestionQueryEngine
is correctly instantiated and that all tools are properly registered in the _query_engines
dictionary. This will help in minimizing the chances of encountering missing tools during query processing [1][2].
To continue talking to Dosu, mention @dosu.
Bug Description
I'm doing RAG using llama-index.The model is Phi3-mini-4k. I have experimented all the models that supports sub-queryengine. When comparing those models, I got pretty good results from Phi3-mini-4k. Even though there are issues like JSON error and Key co-routine error. For same input, I got these errors alternatively. Point to be note I have only ingested 20 documents to do this.
Version
llama-index == 0.10.57
Steps to Reproduce
You can take this as your example to generate the same errors: https://docs.llamaindex.ai/en/stable/examples/query_engine/sub_question_query_engine/ (Please use Open source LLM and LLM question generator for the purpose)
Relevant Logs/Tracbacks