Open erlebach opened 1 month ago
The AttributeError: 'list' object has no attribute 'message'
error occurs because the response
object returned by pipeline.run
is a dictionary, not an object with a message
attribute. You should access the response message using the dictionary key.
Replace:
pipeline_memory.put(response.message)
print(str(response.message))
with:
pipeline_memory.put(response["response"])
print(str(response["response"]))
Here is the corrected code snippet:
user_inputs = [
"Hello!",
"How does tool-use work with Claude-3 work?",
"What models support it?",
"Thanks, that what I needed to know!",
]
for msg in user_inputs:
# get memory
chat_history = pipeline_memory.get()
# prepare inputs
chat_history_str = "\n".join([str(x) for x in chat_history])
# run pipeline
response = pipeline.run(
query_str=msg,
chat_history=chat_history,
chat_history_str=chat_history_str,
)
# update memory
user_msg = ChatMessage(role="user", content=msg)
pipeline_memory.put(user_msg)
print(str(user_msg))
pipeline_memory.put(response["response"])
print(str(response["response"]))
print()
Additionally, ensure that your query_configs
are correctly set up and that you are not passing a list where an object with specific attributes is expected. For example, if you encounter an error like AttributeError: 'list' object has no attribute 'partial_format'
, it indicates that a list is being used where a template object is expected. Make sure your configurations align with the expected structure [1][2][3].
To continue talking to Dosu, mention @dosu.
@dosubot, the error mentions that response
is a list,
which I confirmed. Not a dict
. However, response[0].message
does not exist. Also, note that my code was taken directly from the example notebook at the above link. This is part of the LlamaIndex examples. How this example could have worked is not clear.
The error you're encountering suggests that the response
object is a list, not a dictionary. This indicates that the pipeline.run
method is returning a list of responses rather than a single response dictionary.
To handle this, you should iterate over the list and process each response individually. Here's how you can modify your code:
user_inputs = [
"Hello!",
"How does tool-use work with Claude-3 work?",
"What models support it?",
"Thanks, that what I needed to know!",
]
for msg in user_inputs:
# get memory
chat_history = pipeline_memory.get()
# prepare inputs
chat_history_str = "\n".join([str(x) for x in chat_history])
# run pipeline
responses = pipeline.run(
query_str=msg,
chat_history=chat_history,
chat_history_str=chat_history_str,
)
# update memory
user_msg = ChatMessage(role="user", content=msg)
pipeline_memory.put(user_msg)
print(str(user_msg))
# Iterate over the list of responses
for response in responses:
pipeline_memory.put(response["response"])
print(str(response["response"]))
print()
This change ensures that you correctly handle the list of responses returned by the pipeline.run
method. If the response
object is indeed a list, this approach will process each response in the list individually.
To continue talking to Dosu, mention @dosu.
After implementing your change, @dosu, I get the error:
ValueError Traceback (most recent call last)
Cell In[36], [line 18](vscode-notebook-cell:?execution_count=36&line=18)
[13](vscode-notebook-cell:?execution_count=36&line=13) chat_history_str = "\n".join([str(x) for x in chat_history])
[15](vscode-notebook-cell:?execution_count=36&line=15) # run pipeline
[16](vscode-notebook-cell:?execution_count=36&line=16) # response, intermediates = pipeline.run_with_intermediates(
[17](vscode-notebook-cell:?execution_count=36&line=17) # response = pipeline.run_with_intermediates(
---> [18](vscode-notebook-cell:?execution_count=36&line=18) response = pipeline.run(
[19](vscode-notebook-cell:?execution_count=36&line=19) query_str=msg,
[20](vscode-notebook-cell:?execution_count=36&line=20) chat_history=chat_history,
[21](vscode-notebook-cell:?execution_count=36&line=21) chat_history_str=chat_history_str,
[22](vscode-notebook-cell:?execution_count=36&line=22) )
[24](vscode-notebook-cell:?execution_count=36&line=24) print("================================================")
[25](vscode-notebook-cell:?execution_count=36&line=25) print(f"==> {type(response)=}")
File ~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:410, in QueryPipeline.run(self, return_values_direct, callback_manager, batch, *args, **kwargs)
[406](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:406) query_payload = json.dumps(str(kwargs))
[407](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:407) with self.callback_manager.event(
[408](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:408) CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_payload}
[409](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:409) ) as query_event:
--> [410](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:410) outputs, _ = self._run(
[411](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:411) *args,
[412](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:412) return_values_direct=return_values_direct,
[413](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:413) show_intermediates=False,
[414](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:414) batch=batch,
[415](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:415) **kwargs,
[416](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:416) )
[418](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:418) return outputs
File ~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:779, in QueryPipeline._run(self, return_values_direct, show_intermediates, batch, *args, **kwargs)
[773](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:773) else:
[774](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:774) result_outputs, intermediates = self._run_multi(
[775](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:775) {root_key: kwargs}, show_intermediates=show_intermediates
[776](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:776) )
[778](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:778) return (
--> [779](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:779) self._get_single_result_output(result_outputs, return_values_direct),
[780](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:780) intermediates,
[781](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:781) )
File ~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:713, in QueryPipeline._get_single_result_output(self, result_outputs, return_values_direct)
[706](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:706) """Get result output from a single module.
[707](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:707)
[708](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:708) If output dict is a single key, return the value directly
[709](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:709) if return_values_direct is True.
[710](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:710)
[711](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:711) """
[712](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:712) if len(result_outputs) != 1:
--> [713](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:713) raise ValueError("Only one output is supported.")
[715](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:715) result_output = next(iter(result_outputs.values()))
[716](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:716) # return_values_direct: if True, return the value directly
[717](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:717) # without the key
[718](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:718) # if it's a dict with one key, return the value
ValueError: Only one output is supported.
Note the line 410:
--> [410](https://file+.vscode-resource.vscode-cdn.net/Users/erlebach/src/2024/llama_index_gordon/basics/querypipeline/~/src/2024/llama_index_gordon/basics/.venv/lib/python3.12/site-packages/llama_index/core/query_pipeline/query.py:410) outputs, _ = self._run(
self.run
only supports a single output. Yet, on the left, there are at least TWO outputs. This is library code, not my code.
Question: has anybody run the Query Pipeline Chat Engine? Perhaps point me to the author so I can asked him directly? Here is the link again. Thanks.
The ValueError: Only one output is supported
error occurs because the QueryPipeline
class is designed to handle only a single root and a single output module when using the run
method. This error indicates that your pipeline configuration might have multiple roots or outputs, which is not supported by the run
method.
To resolve this issue, you need to ensure that your pipeline has only one root module and one output module. If you need to handle multiple roots or outputs, you should use the run_multi
method instead of run
.
Here is an example of how to configure a pipeline with a single root and output:
pipeline = QueryPipeline(
chain=[
QueryComponent1(), # root module
QueryComponent2(), # intermediate module
QueryComponent3() # output module
]
)
result = pipeline.run(query_str="Your query here")
If you have multiple roots or outputs, use run_multi
:
pipeline = QueryPipeline(
modules={
"root1": QueryComponent1(),
"root2": QueryComponent2(),
"output": QueryComponent3()
},
links=[
Link(src="root1", dest="output"),
Link(src="root2", dest="output")
]
)
result = pipeline.run_multi({
"root1": {"input1": 1, "input2": 2},
"root2": {"input1": 3, "input2": 4}
})
By following these guidelines, you should be able to resolve the ValueError
and successfully run the QueryPipeline
. Additionally, you can refer to the test_query_pipeline_multi_intermediate_output
function in the llama-index-legacy/tests/query_pipeline/test_query.py
file for an example of handling multiple intermediate outputs using run_multi_with_intermediates
[1][2][3][4].
This is a tough nut to crack, could really use your help on this, @logan-markewich.
To continue talking to Dosu, mention @dosu.
@dosu : Here is the pipeline:
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm_c,
"rewrite_retriever": retriever,
"query_retriever": retriever,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
verbose=True,
)
# run both retrievers -- once with the hallucinated query, once with the real query
pipeline.add_link(
"input", "rewrite_template", src_key="query_str", dest_key="query_str"
)
pipeline.add_link(
"input",
"rewrite_template",
src_key="chat_history_str",
dest_key="chat_history_str",
)
pipeline.add_link("rewrite_template", "llm")
pipeline.add_link("llm", "rewrite_retriever")
pipeline.add_link("input", "query_retriever", src_key="query_str")
# each input to the argpack component needs a dest key -- it can be anything
# then, the argpack component will pack all the inputs into a single list
pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes")
pipeline.add_link("query_retriever", "join", dest_key="query_nodes")
# reranker needs the packed nodes and the query string
pipeline.add_link("join", "reranker", dest_key="nodes")
pipeline.add_link(
"input", "reranker", src_key="query_str", dest_key="query_str"
)
# synthesizer needs the reranked nodes and query str
pipeline.add_link("reranker", "response_component", dest_key="nodes")
pipeline.add_link(
"input", "response_component", src_key="query_str", dest_key="query_str"
)
pipeline.add_link(
"input",
"response_component",
src_key="chat_history",
dest_key="chat_history",
)
Does this look like a multi-input or multi-output pipeline to you?
Your pipeline configuration appears to be correctly set up with a single root and a single output module. However, the ValueError: Only one output is supported
error suggests that the result_outputs
dictionary contains more than one key at some point in the pipeline execution.
To resolve this issue, ensure that the pipeline produces only one final output. Here are a few steps you can take:
Check the Pipeline Configuration: Ensure that the pipeline is configured to have only one final output module. If you have multiple output modules, you need to consolidate them into one.
Modify the Pipeline: If you have multiple modules producing outputs, you might need to add an additional step to combine these outputs into a single output.
Debugging: Use the print_debug_input
and print_debug_input_multi
functions to print the inputs and outputs at each stage of the pipeline to ensure that only one output is being produced.
Here is an example of how you might modify the pipeline to ensure a single output:
# Example of combining multiple outputs into a single output
def combine_outputs(output1, output2):
# Logic to combine outputs
combined_output = {**output1, **output2}
return combined_output
# Assuming you have two modules producing outputs
output1 = module1.run(input_data)
output2 = module2.run(input_data)
# Combine the outputs
final_output = combine_outputs(output1, output2)
# Ensure the final output is a single dictionary
result_outputs = {'final_output': final_output}
By ensuring that the result_outputs
dictionary contains only one key, you can avoid the ValueError: Only one output is supported
error. Additionally, make sure that the root module and the final output module are correctly configured to handle a single input and output, respectively, as shown in the QueryPipeline
class implementation [1][2][3].
To continue talking to Dosu, mention @dosu.
@dosu: To be clear: what does "single output" actually mean? It obviously means something different than a single object, since you appear to imply that a dictionary (single object) with two keys is problematic. Where did you conclude this from?
In the context of the QueryPipeline
, "single output" means that the pipeline should produce a single result at the end of its execution. This result should be a single dictionary with one key-value pair, where the key is the output identifier and the value is the output data.
The ValueError: Only one output is supported
error occurs when the result_outputs
dictionary contains more than one key. This is enforced in the _get_single_result_output
method, which checks that the length of result_outputs
is exactly one:
def _get_single_result_output(self, result_outputs, return_values_direct):
if len(result_outputs) != 1:
raise ValueError("Only one output is supported.")
result_output = next(iter(result_outputs.values()))
return result_output
This means that the pipeline should be configured in such a way that all intermediate results are consolidated into a single final output before the pipeline completes. If your pipeline produces multiple outputs, you need to combine them into a single output dictionary with one key.
For example, if you have multiple modules producing outputs, you should combine these outputs into a single dictionary with one key:
# Example of combining multiple outputs into a single output
def combine_outputs(output1, output2):
# Logic to combine outputs
combined_output = {**output1, **output2}
return combined_output
# Assuming you have two modules producing outputs
output1 = module1.run(input_data)
output2 = module2.run(input_data)
# Combine the outputs
final_output = combine_outputs(output1, output2)
# Ensure the final output is a single dictionary with one key
result_outputs = {'final_output': final_output}
By ensuring that the result_outputs
dictionary contains only one key, you can avoid the ValueError: Only one output is supported
error. This approach ensures that the pipeline produces a single consolidated output, which is what the QueryPipeline
expects [1][2].
To continue talking to Dosu, mention @dosu.
Thanks, @dosu. This was incredibly helpful. I have not found this behavior (the output being a dict object with a single entry documented anywhere. That is hardly obvious. I wasted quite a lot of time on this.
I'm glad I could help clarify the issue. The requirement for the output to be a dictionary with a single key-value pair is indeed not always intuitive and can be easily overlooked. It's great that you brought this up, as it highlights an area where the documentation could be improved to save others from similar confusion.
If you have any more questions or run into further issues, feel free to ask. Happy coding!
To continue talking to Dosu, mention @dosu.
@dosu; I hope that the maintainers and developers of LlamaIndex read the use comments and update as needed.
It looks like the issue stems from the QueryPipeline
class's requirement that the final output must be a single dictionary with one key-value pair. This is enforced in the _get_single_result_output
method, which raises a ValueError
if the result_outputs
dictionary contains more than one key.
To resolve this, you need to ensure that your pipeline produces a single consolidated output. Here’s a summary of the steps you can take:
Ensure Single Output: The pipeline should be configured to produce a single final output. This means combining any intermediate results into a single dictionary with one key.
Combine Outputs: If you have multiple modules producing outputs, combine them into a single output dictionary. Here’s an example:
# Example of combining multiple outputs into a single output
def combine_outputs(output1, output2):
# Logic to combine outputs
combined_output = {**output1, **output2}
return combined_output
# Assuming you have two modules producing outputs
output1 = module1.run(input_data)
output2 = module2.run(input_data)
# Combine the outputs
final_output = combine_outputs(output1, output2)
# Ensure the final output is a single dictionary with one key
result_outputs = {'final_output': final_output}
Pipeline Configuration: Ensure that your pipeline configuration aligns with this requirement. Here’s an example of a correctly configured pipeline:
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm_c,
"rewrite_retriever": retriever,
"query_retriever": retriever,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
verbose=True,
)
# run both retrievers -- once with the hallucinated query, once with the real query
pipeline.add_link(
"input", "rewrite_template", src_key="query_str", dest_key="query_str"
)
pipeline.add_link(
"input",
"rewrite_template",
src_key="chat_history_str",
dest_key="chat_history_str",
)
pipeline.add_link("rewrite_template", "llm")
pipeline.add_link("llm", "rewrite_retriever")
pipeline.add_link("input", "query_retriever", src_key="query_str")
# each input to the argpack component needs a dest key -- it can be anything
# then, the argpack component will pack all the inputs into a single list
pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes")
pipeline.add_link("query_retriever", "join", dest_key="query_nodes")
# reranker needs the packed nodes and the query string
pipeline.add_link("join", "reranker", dest_key="nodes")
pipeline.add_link(
"input", "reranker", src_key="query_str", dest_key="query_str"
)
# synthesizer needs the reranked nodes and query str
pipeline.add_link("reranker", "response_component", dest_key="nodes")
pipeline.add_link(
"input", "response_component", src_key="query_str", dest_key="query_str"
)
pipeline.add_link(
"input",
"response_component",
src_key="chat_history",
dest_key="chat_history",
)
By following these steps, you should be able to avoid the ValueError
and successfully run the QueryPipeline
. If you have any more questions or run into further issues, feel free to ask. Happy coding!
To continue talking to Dosu, mention @dosu.
@dosu, I am still debugging. The following pipeline is fine:
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm_c,
"rewrite_retriever": retriever,
},
verbose=True,
)
# run both retrievers -- once with the hallucinated query, once with the real query
pipeline.add_link(
"input", "rewrite_template", src_key="query_str", dest_key="query_str"
)
pipeline.add_link(
"input",
"rewrite_template",
src_key="chat_history_str",
dest_key="chat_history_str",
)
pipeline.add_link("rewrite_template", "llm")
pipeline.add_link("llm", "rewrite_retriever")
If I add the following to the pipeline and add
pipeline.add_link("input", "query_retriever", src_key="query_str")
I get two outputs: # # Multiple outputs:
["rewrite_retriever", "query_retriever"]]
Any ideas?
The issue of getting multiple outputs when adding the query_retriever
component to your pipeline is likely due to the way the pipeline is configured to handle outputs from multiple retrievers. When you add the query_retriever
component, the pipeline ends up with two output-producing components: rewrite_retriever
and query_retriever
.
To resolve this, you need to ensure that the outputs from both retrievers are combined into a single output before the pipeline completes. Here’s how you can modify your pipeline to achieve this:
Combine Outputs: Use an additional component to combine the outputs from rewrite_retriever
and query_retriever
into a single output.
Add a Combining Component: Create a component that takes the outputs from both retrievers and combines them.
Here’s an example of how you can modify your pipeline:
# Define a combining component
class CombineOutputsComponent(QueryComponent):
def _run_component(self, **kwargs: Any) -> Any:
rewrite_output = kwargs.get("rewrite_output")
query_output = kwargs.get("query_output")
combined_output = {
"rewrite_output": rewrite_output,
"query_output": query_output
}
return {"combined_output": combined_output}
# Initialize the combining component
combine_outputs_component = CombineOutputsComponent()
# Create the pipeline
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm_c,
"rewrite_retriever": retriever,
"query_retriever": retriever,
"combine_outputs": combine_outputs_component,
},
verbose=True,
)
# Add links to the pipeline
pipeline.add_link("input", "rewrite_template", src_key="query_str", dest_key="query_str")
pipeline.add_link("input", "rewrite_template", src_key="chat_history_str", dest_key="chat_history_str")
pipeline.add_link("rewrite_template", "llm")
pipeline.add_link("llm", "rewrite_retriever")
pipeline.add_link("input", "query_retriever", src_key="query_str")
# Add links to combine the outputs
pipeline.add_link("rewrite_retriever", "combine_outputs", dest_key="rewrite_output")
pipeline.add_link("query_retriever", "combine_outputs", dest_key="query_output")
# Now the final output will be from the combine_outputs component
By adding the CombineOutputsComponent
and linking the outputs from both retrievers to it, you ensure that the pipeline produces a single consolidated output. This should resolve the issue of getting multiple outputs.
If you continue to experience issues, make sure to check the configuration of each component and ensure that they are correctly handling their inputs and outputs [1][2].
To continue talking to Dosu, mention @dosu.
@dosu, I think I found the error:
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm_c,
"rewrite_retriever": retriever,
"query_retriever": retriever,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
verbose=True,
)
Notice that the retriever
value appears twice in the modules list. Is that allowed? It matches perfectly with the multiple outputs. Perhaps the retriever should have been two different instances of the Retriever class?
Yes, it is allowed to have the same instance of a component (in this case, retriever
) appear multiple times in the modules list. However, if you are encountering issues with multiple outputs, it is recommended to use two different instances of the Retriever
class instead.
To correctly configure the pipeline to avoid the multiple outputs issue, you should instantiate two separate Retriever
objects and use them in your pipeline configuration. Here is an example of how you can do this:
from llama_index.core.base.base_retriever import BaseRetriever
# Create two different instances of the Retriever class
retriever1 = BaseRetriever(callback_manager=CallbackManager(), verbose=True)
retriever2 = BaseRetriever(callback_manager=CallbackManager(), verbose=True)
# Configure your pipeline with the two different retriever instances
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm_c,
"rewrite_retriever": retriever1,
"query_retriever": retriever2,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
verbose=True,
)
# Add links to the pipeline
pipeline.add_link("input", "rewrite_template", src_key="query_str", dest_key="query_str")
pipeline.add_link("input", "rewrite_template", src_key="chat_history_str", dest_key="chat_history_str")
pipeline.add_link("rewrite_template", "llm")
pipeline.add_link("llm", "rewrite_retriever")
pipeline.add_link("input", "query_retriever", src_key="query_str")
# each input to the argpack component needs a dest key -- it can be anything
# then, the argpack component will pack all the inputs into a single list
pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes")
pipeline.add_link("query_retriever", "join", dest_key="query_nodes")
# reranker needs the packed nodes and the query string
pipeline.add_link("join", "reranker", dest_key="nodes")
pipeline.add_link("input", "reranker", src_key="query_str", dest_key="query_str")
# synthesizer needs the reranked nodes and query str
pipeline.add_link("reranker", "response_component", dest_key="nodes")
pipeline.add_link("input", "response_component", src_key="query_str", dest_key="query_str")
pipeline.add_link("input", "response_component", src_key="chat_history", dest_key="chat_history")
By using two different instances, you ensure that each retriever operates independently, which should help avoid issues related to multiple outputs [1].
To continue talking to Dosu, mention @dosu.
@dosu, I understand the end of the pipeline result output is a dictionary with a single entry. But the intermediate sections of the pipeline do not have this restriction. Outputs can have multiple values. Why was it necessary to separate the cases for single and multiple outputs for the query pipeline? After all, the single output case is a multiple output case of length 1. You appear to have complexified the code unnecessarily.
The separation of cases for single and multiple outputs in the query pipeline is necessary due to the different handling and validation requirements for each scenario. Specifically, the single output case assumes there is only one root and one output, which simplifies the validation and processing logic. In contrast, the multiple output case involves handling multiple roots and outputs, which requires more complex validation and processing.
Supporting evidence from the code:
Single Output Case:
_get_root_key_and_kwargs
method assumes there is only one root and performs specific validations to ensure this:
root_keys = self._get_root_keys()
if len(root_keys) != 1:
raise ValueError("Only one root is supported.")
_get_single_result_output
method ensures that only one output is supported and handles the return value accordingly:
if len(result_outputs) != 1:
raise ValueError("Only one output is supported.")
Multiple Output Case:
run_multi
method handles multiple roots and performs batch processing if required:
if batch:
outputs = {}
batch_lengths = {
len(values)
for subdict in module_input_dict.values()
for values in subdict.values()
}
if len(batch_lengths) != 1:
raise ValueError("Length of batch inputs must be the same.")
# List individual outputs from batch multi input.
inputs = [
{
key: {
inner_key: inner_val[i]
for inner_key, inner_val in value.items()
}
for key, value in module_input_dict.items()
}
for i in range(batch_size)
]
jobs = [self._arun_multi(input) for input in inputs]
results = asyncio_run(run_jobs(jobs, workers=len(jobs)))
for result in results:
outputs = self.merge_dicts(outputs, result[0])
return outputs
else:
outputs, _ = self._run_multi(module_input_dict)
return outputs
Additionally, the test cases in the llama-index-core/tests/query_pipeline/test_query.py
file demonstrate the necessity of handling single and multiple outputs differently. For example, the test_query_pipeline_multi_batch
and test_query_pipeline_multi_intermediate_output
functions show how multiple outputs are processed and validated, while the test_query_pipeline_single_arg_inp
function shows the simpler handling of a single output.
Thus, the separation is necessary to handle the different complexities and requirements of single and multiple output scenarios effectively [1][2].
To continue talking to Dosu, mention @dosu.
@dosu thanks. Next question: consider the pipeline defined by:
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm_c,
"rewrite_retriever": retriever1,
"query_retriever": retriever2,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
verbose=True,
)
pipeline.add_link( "input", "rewrite_template", src_key="query_str", dest_key="query_str" ) pipeline.add_link( "input", "rewrite_template", src_key="chat_history_str", dest_key="chat_history_str", ) pipeline.add_link("rewrite_template", "llm") pipeline.add_link("llm", "rewrite_retriever")
pipeline.add_link("input", "query_retriever", src_key="query_str")
pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes") pipeline.add_link("query_retriever", "join", dest_key="query_nodes")
pipeline.add_link("join", "reranker", dest_key="nodes") pipeline.add_link( "input", "reranker", src_key="query_str", dest_key="query_str" )
pipeline.add_link("reranker", "response_component", dest_key="nodes") pipeline.add_link( "input", "response_component", src_key="query_str", dest_key="query_str" ) pipeline.add_link( "input", "response_component", src_key="chat_history", dest_key="chat_history", )
I added debugging print statements to query.py, in `get_module_keys`:
def get_next_module_keys(self, run_state: RunState) -> List[str]:
"""Determine the next module keys to run based on the current state."""
next_module_keys = []
for module_key, module_input in run_state.all_module_inputs.items():
if module_key in run_state.executed_modules:
continue # Module already executed
# requires input keys not yet used up
required_keys = self.module_dict[module_key].free_req_input_keys
# list the module keys for such that its required but unused keys are
# present in module_input
if all(key in module_input for key in required_keys):
next_module_keys.append(module_key)
# First time, return ['input', 'join']
# 2nd time, return ['rewrite_template', 'query_retriever', 'reranker']
# 3rd time: return [['llm', 'response_component']
print(f"GE: return {next_module_keys=}")
return next_module_keys
and got the output:
889 edge_list = list(self.dag.edges(module_key, data=True)) GE: self._run_multi GE ==> enter _run_multi GE ==> self._validate_inputs GE, RunState, list(module_dict.keys())=['input', 'rewrite_template', 'llm', 'rewrite_retriever', 'query_retriever', 'join', 'reranker', 'response_component'] GE, RunState, 1, list(self.all_module_inputs.items())=[('input', {}), ('rewrite_template', {}), ('llm', {}), ('rewrite_retriever', {}), ('query_retriever', {}), ('join', {}), ('reranker', {}), ('response_component', {})] GE, RunState, 2, list(self.all_module_inputs.items())=[('input', {'query_str': 'Hello!', 'chat_history': [], 'chat_history_str': ''}), ('rewrite_template', {}), ('llm', {}), ('rewrite_retriever', {}), ('query_retriever', {}), ('join', {}), ('reranker', {}), ('response_component', {})] GE ==> run_state: <llama_index.core.query_pipeline.query.RunState object at 0x391c2cce0> GE: return next_module_keys=['input', 'join'] GE ==> next_module_keys: ['input', 'join']
Please explain how I could possibly get ['input', 'join'] as a pair. These two do not appear in any other links that define the network. In my opinion, there is some kind of bug in `query.py`. Why don't you first check that the pipeline is configured correctly and then run it. Don't you expect every pair to be executed at some point during `pipeline.run()`?
@logan, you might be interested in this as well.
After experimenting a big, I don't see why you can't define a pipeline as follows (not idea). First define the modules:
modules={
"input": [input_component, {A, B, C}],
"rewrite_template": [rewrite_template, {B, C}],
"llm": [llm_c, A},
"rewrite_retriever": retriever1, ....
"query_retriever": retriever2,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
I have added to the module name a list of the arguments expected by the module. In fact, even more general would be to specify the input and outputs expected by each module. The pipeline pair would be rewritten as follows:
Replace:
pipeline.add_link( "input", "rewrite_template", src_key="query_str", dest_key="query_str" ) pipeline.add_link( "input", "rewrite_template", src_key="chat_history_str", dest_key="chat_history_str", )
which is a description of two edges of the dag, but a description a link with a single command that would describe the incoming links and the outgoing links. Something written (symbolically for the purposes of this message) as:
pipeline.add_link( {'input': [msg,src2]}, {'rewrite_template': [dest1, msg, dest3]} )
Even more general, replace `[dest1, msg, dest3]` by `[query_str=dest1, chat_str='msg'], chat_templ='dest3]`. Again, this is a 100% manufactured example. The idea is to make the pipeline self-documenting, which at the moment, it most certainly is not.
The reason you are seeing ['input', 'join']
as the next modules to execute is due to the way the get_next_module_keys
method determines which modules are ready to run based on their required input keys. Here's a detailed explanation of how this works:
Initialization: When the pipeline starts, the RunState
is initialized with all modules and their inputs. Initially, only the input
module has its inputs populated.
Determining Next Modules: The get_next_module_keys
method iterates through all modules to check if they have all their required inputs. If a module has all its required inputs, it is added to the list of next modules to execute.
First Execution:
input
module is ready to run because it has its required inputs (query_str
, chat_history
, and chat_history_str
).join
module is also ready to run because it does not have any required inputs initially (it will receive inputs from rewrite_retriever
and query_retriever
later).Subsequent Executions: After the first execution, the pipeline updates the RunState
with the outputs of the executed modules. The get_next_module_keys
method is called again to determine the next set of modules to run based on the updated state.
The debug output you provided shows the state of the pipeline at different stages:
Initial State:
GE, RunState, list(module_dict.keys())=['input', 'rewrite_template', 'llm', 'rewrite_retriever', 'query_retriever', 'join', 'reranker', 'response_component']
GE, RunState, 1, list(self.all_module_inputs.items())=[('input', {}), ('rewrite_template', {}), ('llm', {}), ('rewrite_retriever', {}), ('query_retriever', {}), ('join', {}), ('reranker', {}), ('response_component', {})]
After Populating Inputs for input
Module:
GE, RunState, 2, list(self.all_module_inputs.items())=[('input', {'query_str': 'Hello!', 'chat_history': [], 'chat_history_str': ''}), ('rewrite_template', {}), ('llm', {}), ('rewrite_retriever', {}), ('query_retriever', {}), ('join', {}), ('reranker', {}), ('response_component', {})]
Next Modules to Execute:
GE: return next_module_keys=['input', 'join']
['input', 'join']
?input
: This module is ready to run because it has all its required inputs.join
: This module is ready to run because it does not have any required inputs initially.To make the pipeline more self-documenting and easier to understand, you can use a more structured approach to define the modules and their links. Here’s an example of how you might configure the pipeline in a more self-documenting way:
from llama_index.legacy.query_pipeline.query_component import InputModule, ProcessModule, JoinModule, OutputModule, Link, QueryPipeline
# Define the modules with their expected inputs and outputs
modules = {
"input": InputModule(),
"rewrite_template": ProcessModule(),
"llm": ProcessModule(),
"rewrite_retriever": ProcessModule(),
"query_retriever": ProcessModule(),
"join": JoinModule(),
"reranker": ProcessModule(),
"response_component": OutputModule()
}
# Define the links between modules
links = [
Link(src="input", dest="rewrite_template", src_key="query_str", dest_key="query_str"),
Link(src="input", dest="rewrite_template", src_key="chat_history_str", dest_key="chat_history_str"),
Link(src="rewrite_template", dest="llm"),
Link(src="llm", dest="rewrite_retriever"),
Link(src="input", dest="query_retriever", src_key="query_str"),
Link(src="rewrite_retriever", dest="join", dest_key="rewrite_nodes"),
Link(src="query_retriever", dest="join", dest_key="query_nodes"),
Link(src="join", dest="reranker", dest_key="nodes"),
Link(src="input", dest="reranker", src_key="query_str", dest_key="query_str"),
Link(src="reranker", dest="response_component", dest_key="nodes"),
Link(src="input", dest="response_component", src_key="query_str", dest_key="query_str"),
Link(src="input", dest="response_component", src_key="chat_history", dest_key="chat_history")
]
# Initialize the pipeline
pipeline = QueryPipeline(modules=modules, links=links, verbose=True)
# Run the pipeline
result = pipeline.run(query_str="Hello!", chat_history=[], chat_history_str="")
This configuration clearly shows the modules involved and the flow of data between them, making the pipeline more self-documenting [1][2].
To continue talking to Dosu, mention @dosu.
@Dosu You write:
Why ['input', 'join']?
input: This module is ready to run because it has all its required inputs.
join: This module is ready to run because it does not have any required inputs initially.
So precisely what will run first? Isn't it the case that what is run are the individual links of the pipeline? If not, what is being run? What does it mean to run a module?
For that matter, what does Link(src="input", dest="rewrite_template", src_key="query_str", dest_key="query_str"),
actually mean? It should mean:
1) run the input module
2) take the output of the input module and make it the input of the rewrite template.
Why is it that execution cannot follow the order specified in the link definition? Alternatively, isn't it possible to specify the chain either directly or via links in a way that makes it clear what order they will be executed in? In Langchain, one writes:
A | B | C
so it is obvious that A is executed before B which is executed before C. Thsi notation could be generalized.
In the context of the QueryPipeline
, "running a module" means executing a specific component of the pipeline with the given inputs and processing its outputs. Each module in the pipeline takes inputs, performs its designated task, and passes its outputs to the next module in the sequence or DAG.
The Link
definition in the pipeline specifies the flow of data between modules. For example:
Link(src="input", dest="rewrite_template", src_key="query_str", dest_key="query_str")
This means:
input
module: Execute the input
module to produce its outputs.rewrite_template
: Take the query_str
output from the input
module and use it as the query_str
input for the rewrite_template
module.The execution order is determined by the dependencies between modules. A module can only run if all its required inputs are available. The get_next_module_keys
method identifies which modules are ready to run based on their input dependencies.
Consider the following simplified pipeline:
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm_c,
"rewrite_retriever": retriever1,
"query_retriever": retriever2,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
verbose=True,
)
pipeline.add_link("input", "rewrite_template", src_key="query_str", dest_key="query_str")
pipeline.add_link("input", "rewrite_template", src_key="chat_history_str", dest_key="chat_history_str")
pipeline.add_link("rewrite_template", "llm")
pipeline.add_link("llm", "rewrite_retriever")
pipeline.add_link("input", "query_retriever", src_key="query_str")
pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes")
pipeline.add_link("query_retriever", "join", dest_key="query_nodes")
pipeline.add_link("join", "reranker", dest_key="nodes")
pipeline.add_link("input", "reranker", src_key="query_str", dest_key="query_str")
pipeline.add_link("reranker", "response_component", dest_key="nodes")
pipeline.add_link("input", "response_component", src_key="query_str", dest_key="query_str")
pipeline.add_link("input", "response_component", src_key="chat_history", dest_key="chat_history")
input
module is ready to run because it has its required inputs.input
Module: Execute the input
module to produce its outputs.rewrite_template
and query_retriever
modules are now ready to run because they have received their required inputs from the input
module.rewrite_template
and query_retriever
: Execute these modules to produce their outputs.llm
, rewrite_retriever
, and join
modules are now ready to run based on the outputs of the previous modules.response_component
module produces the final output.To make the execution order more intuitive and self-documenting, you can use the QueryPipeline
abstraction to chain together different modules in a sequential chain or a Directed Acyclic Graph (DAG). This approach enhances readability and reduces boilerplate code, making the workflow easier to understand and maintain.
Here is an example of defining and running a simple QueryPipeline
:
from llama_index.core.query_pipeline import QueryPipeline
from llama_index.core import PromptTemplate
from llama_index.llms.openai import OpenAI
# Define the prompt and LLM
prompt_str = "Please generate related movies to {movie_name}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
# Create the pipeline
p = QueryPipeline(chain=[prompt_tmpl, llm], verbose=True)
# Run the pipeline
output = p.run(movie_name="The Departed")
print(str(output))
For more complex workflows, you can define a DAG explicitly:
from llama_index.postprocessor.cohere_rerank import CohereRerank
from llama_index.core.response_synthesizers import TreeSummarize
# Define modules
prompt_str = "Please generate a question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=3)
reranker = CohereRerank()
summarizer = TreeSummarize(llm=llm)
# Define query pipeline
p = QueryPipeline(verbose=True)
p.add_modules(
{
"llm": llm,
"prompt_tmpl": prompt_tmpl,
"retriever": retriever,
"summarizer": summarizer,
"reranker": reranker,
}
)
# Add links between modules
p.add_link("prompt_tmpl", "llm")
p.add_link("llm", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("llm", "reranker", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")
p.add_link("llm", "summarizer", dest_key="query_str")
# Run the pipeline
response = p.run(topic="YC")
print(str(response))
This approach ensures that the execution order is clear and the pipeline is self-documenting. Additionally, the QueryPipeline
class provides methods like print_debug_input
and print_debug_input_multi
to help with debugging by printing the inputs to each module, making it easier to trace the flow of data through the pipeline [1][2][3].
To continue talking to Dosu, mention @dosu.
@dosu, With additional debug statements, at some point in the pipeline, I get the output:
GE: return next_module_keys=[]
GE ==> next_module_keys=[], module_key='rewrite_retriever'
GE ==> inside if not next_module_keys
GE ==> module_key='rewrite_retriever'
GE ==> list_keys(output_dict)=['output']
GE: 2 len(result_outputs)=2
GE: 2 return_values_direct=True
GE ==> before check result_outputs.keys(): list(result_outputs.keys())=['response_component', 'rewrite_retriever']
GE: _get_single_result_output: list(result_outputs.keys())=['response_component', 'rewrite_retriever']
GE: len(result_outputs): len(result_outputs)=2
notice: return next_module_keys=[]
, which indicates that there are modules ready for execution? Would that indicate an issue with the pipeline?
You explained earlier how 'input' and 'join' are ready to be executed because their input arguments are present. But where in query
are the modules actually executed? Just because join
is ready to be executed should not imply that it will be executed, or does it? I still don't understand how it is possible that the pipeline ends with multiple outputs. Can't the pipeline be checked for inconsistencies before execution starts?
The return next_module_keys=[]
output indicates that there are no more modules ready for execution at that point in the pipeline. This can happen if all modules that can be executed with the available inputs have already been executed, but there are still modules that have not received all their required inputs.
Modules are executed in the QueryPipeline
class within the _run_multi
method. Here’s a simplified explanation of how modules are executed:
get_next_module_keys
method identifies which modules are ready to run based on their input dependencies.RunState
is updated with the outputs of the executed modules.Here’s a simplified version of the relevant code from the QueryPipeline
class:
def _run_multi(self, module_input_dict: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
run_state = RunState(self.module_dict, module_input_dict)
while True:
next_module_keys = self.get_next_module_keys(run_state)
if not next_module_keys:
break
for module_key in next_module_keys:
module = self.module_dict[module_key]
module_input = run_state.all_module_inputs[module_key]
module_output = module.run(**module_input)
run_state.update(module_key, module_output)
return run_state.result_outputs, run_state.intermediates
To ensure the pipeline is consistent before execution, you can check for multiple root or leaf nodes, which might indicate an inconsistency if your pipeline is expected to be linear. Here’s how you can do this:
def check_pipeline_consistency(pipeline: QueryPipeline) -> None:
root_keys = pipeline.get_root_keys()
leaf_keys = pipeline.get_leaf_keys()
if len(root_keys) > 1:
raise ValueError(f"Pipeline has multiple root nodes: {root_keys}")
if len(leaf_keys) > 1:
raise ValueError(f"Pipeline has multiple leaf nodes: {leaf_keys}")
# Example usage
pipeline = QueryPipeline()
# Add modules and links to the pipeline
check_pipeline_consistency(pipeline)
The issue of multiple outputs can occur if the pipeline configuration allows for multiple modules to produce final outputs. To debug this, you can add more detailed logging to trace the flow of data and identify where multiple outputs are being produced.
Here’s an example of how you might add debugging statements to trace the execution flow:
def get_next_module_keys(self, run_state: RunState) -> List[str]:
next_module_keys = []
for module_key, module_input in run_state.all_module_inputs.items():
if module_key in run_state.executed_modules:
continue
required_keys = self.module_dict[module_key].free_req_input_keys
if all(key in module_input for key in required_keys):
next_module_keys.append(module_key)
print(f"GE: return {next_module_keys=}")
return next_module_keys
def _run_multi(self, module_input_dict: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
run_state = RunState(self.module_dict, module_input_dict)
while True:
next_module_keys = self.get_next_module_keys(run_state)
if not next_module_keys:
break
for module_key in next_module_keys:
module = self.module_dict[module_key]
module_input = run_state.all_module_inputs[module_key]
print(f"Executing module: {module_key} with inputs: {module_input}")
module_output = module.run(**module_input)
print(f"Module output: {module_output}")
run_state.update(module_key, module_output)
return run_state.result_outputs, run_state.intermediates
By ensuring that the pipeline is consistent before execution and adding detailed debugging statements, you can trace the flow of data and identify where multiple outputs are being produced. This approach helps in debugging and resolving issues related to multiple outputs in the QueryPipeline
[1][2][3][4].
To continue talking to Dosu, mention @dosu.
@dosu: You write: "The issue of multiple outputs can occur if the pipeline configuration allows for multiple modules to produce final outputs. To debug this, you can add more detailed logging to trace the flow of data and identify where multiple outputs are being produced."
That sounds good, but the fact that the pipeline I am working with and shared with you earlier, produces multiple outputs and you cannot tell from the pipeline, does indicate some kind of error. And the fact is, this error could be anywhere.
Here is a question, @dosu: what happens if there are no module ready to be executed? Does the pipeline end? Or does the pipeline wait until there is a module ready to be executed. If the latter, how long is the wait? After all, how does querypipeline know that additional input is expected?
@dosu: I have read the documents introducing the query pipeline. Are there any advanced documents that provide the details of the pipeline along the lines of what you described above? LlamaIndex is great when it works, but pretty poor when one must debug an error. More knowledge about the internals of the framework (short of looking at source code) would be extremely helpful. Thanks.
If there are no modules ready to be executed, the pipeline will end. The QueryPipeline
does not wait for additional input; it processes the available inputs and executes the modules that are ready based on the current state. Once no more modules can be executed with the available inputs, the pipeline completes its execution.
For more detailed information about the query pipeline, you can refer to the following advanced documents and resources:
Query Pipeline Guide: This guide provides a comprehensive overview of the query pipeline, including usage patterns and module guides.
Advanced Examples: These examples demonstrate advanced usage of the query pipeline, including setting up a text-to-SQL pipeline and using various techniques.
Source Code: For a deeper dive into the implementation details, you can refer to the source code of the QueryPipeline
class.
These resources should provide you with a more detailed understanding of the query pipeline and help you debug any issues you encounter [1][2][3][4][5].
To continue talking to Dosu, mention @dosu.
@dosu, just FYI, not all advanced examples work? For example, my issues are with the query_pipeline_memory.ipynb
example.
A maintainer has already been looped in.
To continue talking to Dosu, mention @dosu.
@dosu thanks. Could you please tell me why payload
and query_payload
are accessed in the function run()
in query.py?
@dispatcher.span
def run(
self,
*args: Any,
return_values_direct: bool = True,
callback_manager: Optional[CallbackManager] = None,
batch: bool = False,
**kwargs: Any,
) -> Any:
"""Run the pipeline."""
# first set callback manager
callback_manager = callback_manager or self.callback_manager
self.set_callback_manager(callback_manager)
with self.callback_manager.as_trace("query"):
# try to get query payload
try:
query_payload = json.dumps(kwargs)
except TypeError:
query_payload = json.dumps(str(kwargs))
with self.callback_manager.event(
CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_payload}
) as query_event:
outputs, _ = self._run(
*args,
return_values_direct=return_values_direct,
show_intermediates=False,
batch=batch,
**kwargs,
)
They do not appear to be used at all. Thanks.
The payload
and query_payload
variables in the run()
function of query.py
are used for logging and tracing purposes with the CallbackManager
. Here’s a detailed explanation:
query_payload
and payload
query_payload
:
kwargs
dictionary into a JSON string. If serialization fails due to a TypeError
, it falls back to converting the kwargs
dictionary to a string and then serializing it.query_payload
is used to log the query event, capturing the input arguments for tracing and debugging purposes.payload
:
callback_manager.event
method, which includes the serialized query_payload
under the key EventPayload.QUERY_STR
.Here’s the relevant code snippet with comments:
@dispatcher.span
def run(
self,
*args: Any,
return_values_direct: bool = True,
callback_manager: Optional[CallbackManager] = None,
batch: bool = False,
**kwargs: Any,
) -> Any:
"""Run the pipeline."""
# First set callback manager
callback_manager = callback_manager or self.callback_manager
self.set_callback_manager(callback_manager)
# Start tracing the query event
with self.callback_manager.as_trace("query"):
# Try to get query payload
try:
query_payload = json.dumps(kwargs)
except TypeError:
query_payload = json.dumps(str(kwargs))
# Log the query event with the payload
with self.callback_manager.event(
CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_payload}
) as query_event:
# Run the pipeline
outputs, _ = self._run(
*args,
return_values_direct=return_values_direct,
show_intermediates=False,
batch=batch,
**kwargs,
)
return outputs
query_payload
: Captures and serializes the input arguments for logging.payload
: Used to log the event details with the CallbackManager
.These variables are crucial for tracing and debugging the pipeline execution, even though they are not directly used in the computation within the run()
method [1][2].
To continue talking to Dosu, mention @dosu.
@dosu, here is likely why I get multiple outputs on a query pipeline that is one of the advanced examples offered by LlamaIndex. LlamaIndex uses GPT3.5 from OpenAI, whereas I use Llama3. At some point in my network the get_next_module_key() is empty when it should not be. If we are not at the end of the pipeline, there output dictionary might have more than one object (I don't know why exactly). Somehow, checking should be done to avoid this situation. If in the middle of a pipeline, an empty list can lead to errors, the condition must obviously be avoided. I still have not figured this out. The only reason I am persisting is to decrease the likelihood of error in future networks, and because other might find this information useful.
The issue you're encountering with get_next_module_keys()
returning an empty list prematurely, leading to multiple outputs in the result dictionary, can indeed cause problems in the pipeline execution. This situation can arise if the pipeline configuration or the state management within the pipeline does not correctly handle intermediate states.
get_next_module_keys()
To address this issue, you can modify the _run_multi
and _arun_multi
methods to ensure that the final output is only added to run_state.result_outputs
if it hasn't been added already. This prevents the pipeline from producing multiple outputs unexpectedly.
Here is the modified code for both methods:
def _run_multi(
self, module_input_dict: Dict[str, Any], show_intermediates=False
) -> Tuple[Dict[str, Any], Dict[str, ComponentIntermediates]]:
"""Run the pipeline for multiple roots."""
self._validate_inputs(module_input_dict)
run_state = self.get_run_state(module_input_dict)
# Add root inputs to all_module_inputs
next_module_keys = self.get_next_module_keys(run_state)
while True:
for module_key in next_module_keys:
module = run_state.module_dict[module_key]
module_input = run_state.all_module_inputs[module_key]
if self.verbose:
print_debug_input(module_key, module_input)
output_dict = module.run_component(**module_input)
if (
show_intermediates
and module_key not in run_state.intermediate_outputs
):
run_state.intermediate_outputs[module_key] = ComponentIntermediates(
inputs=module_input, outputs=output_dict
)
self.process_component_output(
output_dict,
module_key,
run_state,
)
next_module_keys = self.get_next_module_keys(
run_state,
)
if not next_module_keys:
if module_key not in run_state.result_outputs:
run_state.result_outputs[module_key] = output_dict
break
return run_state.result_outputs, run_state.intermediate_outputs
async def _arun_multi(
self, module_input_dict: Dict[str, Any], show_intermediates: bool = False
) -> Tuple[Dict[str, Any], Dict[str, ComponentIntermediates]]:
"""Run the pipeline for multiple roots.
kwargs is in the form of module_dict -> input_dict
input_dict is in the form of input_key -> input
"""
self._validate_inputs(module_input_dict)
run_state = self.get_run_state(module_input_dict)
# Add root inputs to all_module_inputs
next_module_keys = self.get_next_module_keys(run_state)
while True:
jobs = []
for module_key in next_module_keys:
module = run_state.module_dict[module_key]
module_input = run_state.all_module_inputs[module_key]
if self.verbose:
print_debug_input(module_key, module_input)
jobs.append(module.arun_component(**module_input))
output_dicts = await run_jobs(jobs, show_progress=self.show_progress)
for module_key, output_dict in zip(next_module_keys, output_dicts):
if (
show_intermediates
and module_key not in run_state.intermediate_outputs
):
run_state.intermediate_outputs[module_key] = ComponentIntermediates(
inputs=module_input, outputs=output_dict
)
self.process_component_output(
output_dict,
module_key,
run_state,
)
next_module_keys = self.get_next_module_keys(
run_state,
)
if not next_module_keys:
if module_key not in run_state.result_outputs:
run_state.result_outputs[module_key] = output_dicts[-1]
break
return run_state.result_outputs, run_state.intermediate_outputs
next_module_keys
: The code checks if next_module_keys
is empty. If it is, it ensures that the final output is only added to run_state.result_outputs
if it hasn't been added already.run_state.result_outputs
.By implementing these changes, you can avoid the issue of multiple outputs in the result dictionary and ensure that the pipeline execution is consistent and reliable [1].
To continue talking to Dosu, mention @dosu.
@dosu. Thanks. But when you write: "Check for Empty next_module_keys: The code checks if next_module_keys is empty. If it is, it ensures that the final output is only added to run_state.result_outputs if it hasn't been added already.", aren't you implying that my final state with multiple outputs are formed from the same output present multiple times? If that was not the case, your solution would not work.
The solution I provided ensures that the final output is only added to run_state.result_outputs
if it hasn't been added already, which prevents the same output from being added multiple times. However, if your final state with multiple outputs is formed from different outputs, this solution alone would not address the issue.
To handle the case where different outputs are being added to run_state.result_outputs
, you need to ensure that the pipeline configuration and the state management correctly handle intermediate states and consolidate outputs appropriately.
Here are additional steps to ensure that the pipeline produces a single consolidated output:
Combine Outputs: Ensure that all intermediate outputs are combined into a single output before the pipeline completes.
Check for Multiple Outputs: Add a check to ensure that only one output is present in run_state.result_outputs
before the pipeline completes.
Here’s the modified code with additional checks to handle different outputs:
def _run_multi(
self, module_input_dict: Dict[str, Any], show_intermediates=False
) -> Tuple[Dict[str, Any], Dict[str, ComponentIntermediates]]:
"""Run the pipeline for multiple roots."""
self._validate_inputs(module_input_dict)
run_state = self.get_run_state(module_input_dict)
# Add root inputs to all_module_inputs
next_module_keys = self.get_next_module_keys(run_state)
while True:
for module_key in next_module_keys:
module = run_state.module_dict[module_key]
module_input = run_state.all_module_inputs[module_key]
if self.verbose:
print_debug_input(module_key, module_input)
output_dict = module.run_component(**module_input)
if (
show_intermediates
and module_key not in run_state.intermediate_outputs
):
run_state.intermediate_outputs[module_key] = ComponentIntermediates(
inputs=module_input, outputs=output_dict
)
self.process_component_output(
output_dict,
module_key,
run_state,
)
next_module_keys = self.get_next_module_keys(
run_state,
)
if not next_module_keys:
# Ensure only one output is added to result_outputs
if len(run_state.result_outputs) == 0:
run_state.result_outputs[module_key] = output_dict
else:
raise ValueError("Multiple outputs detected in the final state.")
break
return run_state.result_outputs, run_state.intermediate_outputs
async def _arun_multi(
self, module_input_dict: Dict[str, Any], show_intermediates: bool = False
) -> Tuple[Dict[str, Any], Dict[str, ComponentIntermediates]]:
"""Run the pipeline for multiple roots.
kwargs is in the form of module_dict -> input_dict
input_dict is in the form of input_key -> input
"""
self._validate_inputs(module_input_dict)
run_state = self.get_run_state(module_input_dict)
# Add root inputs to all_module_inputs
next_module_keys = self.get_next_module_keys(run_state)
while True:
jobs = []
for module_key in next_module_keys:
module = run_state.module_dict[module_key]
module_input = run_state.all_module_inputs[module_key]
if self.verbose:
print_debug_input(module_key, module_input)
jobs.append(module.arun_component(**module_input))
output_dicts = await run_jobs(jobs, show_progress=self.show_progress)
for module_key, output_dict in zip(next_module_keys, output_dicts):
if (
show_intermediates
and module_key not in run_state.intermediate_outputs
):
run_state.intermediate_outputs[module_key] = ComponentIntermediates(
inputs=module_input, outputs=output_dict
)
self.process_component_output(
output_dict,
module_key,
run_state,
)
next_module_keys = self.get_next_module_keys(
run_state,
)
if not next_module_keys:
# Ensure only one output is added to result_outputs
if len(run_state.result_outputs) == 0:
run_state.result_outputs[module_key] = output_dicts[-1]
else:
raise ValueError("Multiple outputs detected in the final state.")
break
return run_state.result_outputs, run_state.intermediate_outputs
run_state.result_outputs
, the code checks if run_state.result_outputs
already contains an output. If it does, a ValueError
is raised, indicating that multiple outputs have been detected.By implementing these changes, you can ensure that the pipeline produces a single consolidated output and handle cases where different outputs might be added to run_state.result_outputs
[1][2].
To continue talking to Dosu, mention @dosu.
@dosu: my final output is found in the debug comment below:
_get_single_result_output: list(result_outputs.keys())=['response_component', 'rewrite_retriever']
As you can see, the two element are not the same. I still don't know how this happens, which is why I am analyzing the pipeline step by step.
The debug output you provided indicates that the final result_outputs
dictionary contains two different keys: 'response_component'
and 'rewrite_retriever'
. This suggests that both of these modules are producing outputs that are being added to the final result, which should not happen if the pipeline is configured correctly to produce a single consolidated output.
To understand how this situation arises, let's break down the pipeline configuration and execution flow:
Pipeline Configuration:
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm_c,
"rewrite_retriever": retriever1,
"query_retriever": retriever2,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
verbose=True,
)
pipeline.add_link("input", "rewrite_template", src_key="query_str", dest_key="query_str")
pipeline.add_link("input", "rewrite_template", src_key="chat_history_str", dest_key="chat_history_str")
pipeline.add_link("rewrite_template", "llm")
pipeline.add_link("llm", "rewrite_retriever")
pipeline.add_link("input", "query_retriever", src_key="query_str")
pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes")
pipeline.add_link("query_retriever", "join", dest_key="query_nodes")
pipeline.add_link("join", "reranker", dest_key="nodes")
pipeline.add_link("input", "reranker", src_key="query_str", dest_key="query_str")
pipeline.add_link("reranker", "response_component", dest_key="nodes")
pipeline.add_link("input", "response_component", src_key="query_str", dest_key="query_str")
pipeline.add_link("input", "response_component", src_key="chat_history", dest_key="chat_history")
Execution Flow:
input
module runs first, providing initial inputs.rewrite_template
and query_retriever
modules are then ready to run.llm
module runs after rewrite_template
.rewrite_retriever
and query_retriever
modules run next.join
module combines the outputs from rewrite_retriever
and query_retriever
.reranker
module processes the combined output.response_component
module produces the final response.Check Intermediate Outputs:
def _run_multi(self, module_input_dict: Dict[str, Any], show_intermediates=False) -> Tuple[Dict[str, Any], Dict[str, ComponentIntermediates]]:
self._validate_inputs(module_input_dict)
run_state = self.get_run_state(module_input_dict)
next_module_keys = self.get_next_module_keys(run_state)
while True:
for module_key in next_module_keys:
module = run_state.module_dict[module_key]
module_input = run_state.all_module_inputs[module_key]
if self.verbose:
print_debug_input(module_key, module_input)
output_dict = module.run_component(**module_input)
print(f"Module {module_key} output: {output_dict}")
if show_intermediates and module_key not in run_state.intermediate_outputs:
run_state.intermediate_outputs[module_key] = ComponentIntermediates(inputs=module_input, outputs=output_dict)
self.process_component_output(output_dict, module_key, run_state)
next_module_keys = self.get_next_module_keys(run_state)
if not next_module_keys:
if len(run_state.result_outputs) == 0:
run_state.result_outputs[module_key] = output_dict
else:
raise ValueError("Multiple outputs detected in the final state.")
break
return run_state.result_outputs, run_state.intermediate_outputs
Ensure Single Output:
def _run_multi(self, module_input_dict: Dict[str, Any], show_intermediates=False) -> Tuple[Dict[str, Any], Dict[str, ComponentIntermediates]]:
self._validate_inputs(module_input_dict)
run_state = self.get_run_state(module_input_dict)
next_module_keys = self.get_next_module_keys(run_state)
while True:
for module_key in next_module_keys:
module = run_state.module_dict[module_key]
module_input = run_state.all_module_inputs[module_key]
if self.verbose:
print_debug_input(module_key, module_input)
output_dict = module.run_component(**module_input)
print(f"Module {module_key} output: {output_dict}")
if show_intermediates and module_key not in run_state.intermediate_outputs:
run_state.intermediate_outputs[module_key] = ComponentIntermediates(inputs=module_input, outputs=output_dict)
self.process_component_output(output_dict, module_key, run_state)
next_module_keys = self.get_next_module_keys(run_state)
if not next_module_keys:
if len(run_state.result_outputs) == 0:
run_state.result_outputs[module_key] = output_dict
else:
# Combine outputs if multiple outputs are detected
combined_output = self.combine_outputs(run_state.result_outputs, output_dict)
run_state.result_outputs = {'final_output': combined_output}
break
return run_state.result_outputs, run_state.intermediate_outputs
def combine_outputs(self, result_outputs: Dict[str, Any], new_output: Dict[str, Any]) -> Dict[str, Any]:
# Logic to combine multiple outputs into a single output
combined_output = {**result_outputs, **new_output}
return combined_output
By following these steps, you can trace the flow of data through the pipeline, identify where multiple outputs are being produced, and ensure that the pipeline produces a single consolidated output. This approach should help you debug and resolve the issue with multiple outputs in the QueryPipeline
[1][2].
To continue talking to Dosu, mention @dosu.
@dosu, you deduced:
Execution Flow:
The input module runs first, providing initial inputs.
The rewrite_template and query_retriever modules are then ready to run.
The llm module runs after rewrite_template.
The rewrite_retriever and query_retriever modules run next.
The join module combines the outputs from rewrite_retriever and query_retriever.
The reranker module processes the combined output.
Finally, the response_component module produces the final response.
However, input
ran first, then join
.
The execution flow you observed, where the input
module runs first followed by the join
module, is determined by the dependencies and readiness of the modules in the pipeline. Here’s a detailed explanation of why this happens:
Initial State:
input
module is ready to run because it has all its required inputs (query_str
, chat_history
, and chat_history_str
).Running the input
Module:
input
module runs first, providing its outputs.Determining Next Modules:
input
module runs, the pipeline checks which modules are ready to run next.join
module is ready to run because it does not have any required inputs initially. It is designed to combine inputs from other modules, but it can be executed without waiting for those inputs if it has no dependencies.join
Runs After input
The join
module runs after the input
module because it is considered ready to run based on its configuration. The join
module is designed to combine inputs from other modules, but it can be executed without waiting for those inputs if it has no dependencies.
To ensure that the execution order follows the intended flow, you need to carefully configure the dependencies between modules. Here’s how you can do that:
Define Dependencies Explicitly:
Use Conditional Execution:
join
only run when their required inputs are available.Here’s an example of how you might configure the pipeline to ensure the correct execution order:
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm_c,
"rewrite_retriever": retriever1,
"query_retriever": retriever2,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
verbose=True,
)
pipeline.add_link("input", "rewrite_template", src_key="query_str", dest_key="query_str")
pipeline.add_link("input", "rewrite_template", src_key="chat_history_str", dest_key="chat_history_str")
pipeline.add_link("rewrite_template", "llm")
pipeline.add_link("llm", "rewrite_retriever")
pipeline.add_link("input", "query_retriever", src_key="query_str")
pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes")
pipeline.add_link("query_retriever", "join", dest_key="query_nodes")
pipeline.add_link("join", "reranker", dest_key="nodes")
pipeline.add_link("input", "reranker", src_key="query_str", dest_key="query_str")
pipeline.add_link("reranker", "response_component", dest_key="nodes")
pipeline.add_link("input", "response_component", src_key="query_str", dest_key="query_str")
pipeline.add_link("input", "response_component", src_key="chat_history", dest_key="chat_history")
To debug the execution flow, you can add logging statements to trace the execution of each module:
def _run_multi(self, module_input_dict: Dict[str, Any], show_intermediates=False) -> Tuple[Dict[str, Any], Dict[str, ComponentIntermediates]]:
self._validate_inputs(module_input_dict)
run_state = self.get_run_state(module_input_dict)
next_module_keys = self.get_next_module_keys(run_state)
while True:
for module_key in next_module_keys:
module = run_state.module_dict[module_key]
module_input = run_state.all_module_inputs[module_key]
if self.verbose:
print_debug_input(module_key, module_input)
output_dict = module.run_component(**module_input)
print(f"Module {module_key} output: {output_dict}")
if show_intermediates and module_key not in run_state.intermediate_outputs:
run_state.intermediate_outputs[module_key] = ComponentIntermediates(inputs=module_input, outputs=output_dict)
self.process_component_output(output_dict, module_key, run_state)
next_module_keys = self.get_next_module_keys(run_state)
if not next_module_keys:
if len(run_state.result_outputs) == 0:
run_state.result_outputs[module_key] = output_dict
else:
raise ValueError("Multiple outputs detected in the final state.")
break
return run_state.result_outputs, run_state.intermediate_outputs
By carefully configuring the dependencies and adding detailed logging, you can ensure that the pipeline executes in the intended order and debug any issues that arise during execution [1][2].
To continue talking to Dosu, mention @dosu.
If "The join module is designed to combine inputs from other modules", and if the "join" runs without those inputs being available, something will eventually go wrong. Why doesn't "join" wait for the two inputs to be available?
The join
module in the QueryPipeline
is designed to combine inputs from other modules, but it should indeed wait for those inputs to be available before running. The behavior you're describing, where join
runs without waiting for its required inputs, indicates a misconfiguration in the pipeline setup.
join
Waits for Required InputsTo ensure that the join
module only runs when both rewrite_retriever
and query_retriever
have provided their outputs, you need to configure the pipeline links correctly. This ensures that the join
module receives inputs from both retrievers before it executes.
Here’s how you can configure the pipeline to ensure the join
module waits for the required inputs:
Define the Pipeline:
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm_c,
"rewrite_retriever": retriever1,
"query_retriever": retriever2,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
verbose=True,
)
Add Links to Ensure Dependencies:
pipeline.add_link("input", "rewrite_template", src_key="query_str", dest_key="query_str")
pipeline.add_link("input", "rewrite_template", src_key="chat_history_str", dest_key="chat_history_str")
pipeline.add_link("rewrite_template", "llm")
pipeline.add_link("llm", "rewrite_retriever")
pipeline.add_link("input", "query_retriever", src_key="query_str")
# Ensure join waits for both retrievers
pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes")
pipeline.add_link("query_retriever", "join", dest_key="query_nodes")
pipeline.add_link("join", "reranker", dest_key="nodes")
pipeline.add_link("input", "reranker", src_key="query_str", dest_key="query_str")
pipeline.add_link("reranker", "response_component", dest_key="nodes")
pipeline.add_link("input", "response_component", src_key="query_str", dest_key="query_str")
pipeline.add_link("input", "response_component", src_key="chat_history", dest_key="chat_history")
add_link
method is used to define the dependencies between modules. By linking rewrite_retriever
and query_retriever
to join
with different destination keys (rewrite_nodes
and query_nodes
), you ensure that join
will only run when it has received inputs from both retrievers.QueryPipeline
will check the dependencies and ensure that join
only runs when both rewrite_retriever
and query_retriever
have been executed and their outputs are available.To trace the flow of data and execution order within the QueryPipeline
, you can use debugging and logging tools provided by LlamaIndex:
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))
Additionally, you can use the CallbackManager
to add multiple callbacks for debugging:
from llama_index.core import set_global_handler
set_global_handler("simple")
By configuring the pipeline correctly and using debugging tools, you can ensure that the join
module waits for the required inputs before running, preventing issues related to premature execution [1][2].
To continue talking to Dosu, mention @dosu.
@dosu: the two lines:
# Ensure join waits for both retrievers
pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes")
pipeline.add_link("query_retriever", "join", dest_key="query_nodes")
were already part of my pipeline (I was hoping they weren't), and yet, input
and 'join` are executed first.
The issue where input
and join
are executed first, despite the join
module being configured to wait for inputs from both rewrite_retriever
and query_retriever
, suggests that the pipeline's dependency management is not functioning as expected.
To ensure that the join
module waits for the required inputs, you need to verify that the dependencies are correctly set up and that the pipeline's state management correctly handles these dependencies.
Check Module Dependencies:
join
module has the correct dependencies and that these dependencies are being respected during execution.Add Detailed Logging:
Here’s an example of how you might add detailed logging to trace the execution flow and ensure that the join
module waits for its required inputs:
def get_next_module_keys(self, run_state: RunState) -> List[str]:
"""Determine the next module keys to run based on the current state."""
next_module_keys = []
for module_key, module_input in run_state.all_module_inputs.items():
if module_key in run_state.executed_modules:
continue # Module already executed
required_keys = self.module_dict[module_key].free_req_input_keys
if all(key in module_input for key in required_keys):
next_module_keys.append(module_key)
print(f"GE: return {next_module_keys=}")
return next_module_keys
def _run_multi(self, module_input_dict: Dict[str, Any], show_intermediates=False) -> Tuple[Dict[str, Any], Dict[str, ComponentIntermediates]]:
self._validate_inputs(module_input_dict)
run_state = self.get_run_state(module_input_dict)
next_module_keys = self.get_next_module_keys(run_state)
while True:
for module_key in next_module_keys:
module = run_state.module_dict[module_key]
module_input = run_state.all_module_inputs[module_key]
if self.verbose:
print_debug_input(module_key, module_input)
output_dict = module.run_component(**module_input)
print(f"Module {module_key} output: {output_dict}")
if show_intermediates and module_key not in run_state.intermediate_outputs:
run_state.intermediate_outputs[module_key] = ComponentIntermediates(inputs=module_input, outputs=output_dict)
self.process_component_output(output_dict, module_key, run_state)
next_module_keys = self.get_next_module_keys(run_state)
if not next_module_keys:
if len(run_state.result_outputs) == 0:
run_state.result_outputs[module_key] = output_dict
else:
raise ValueError("Multiple outputs detected in the final state.")
break
return run_state.result_outputs, run_state.intermediate_outputs
print_debug_input
and print
statements help trace the execution flow and verify that the dependencies are being respected.join
module only runs when it has received inputs from both rewrite_retriever
and query_retriever
.By adding detailed logging and verifying the dependencies, you can ensure that the join
module waits for the required inputs before running. This approach helps debug and resolve issues related to premature execution in the QueryPipeline
[1][2].
To continue talking to Dosu, mention @dosu.
@dosu: correct me if I am mistaken, but the first time get_module_keys()
is called, 'join' should not be in the list of modules ready to execute. So there is no need to check the _run_multi(self, ..)
method.
Yes, you are correct. The join
module should not be in the list of modules ready to execute the first time get_next_module_keys()
is called. The join
action is designed to be the last action in the plan, as indicated by the JOIN_DESCRIPTION
:
JOIN_DESCRIPTION = (
"join():\n"
" - Collects and combines results from prior actions.\n"
" - A LLM agent is called upon invoking join to either finalize the user query or wait until the plans are executed.\n"
" - join should always be the last action in the plan, and will be called in two scenarios:\n"
" (a) if the answer can be determined by gathering the outputs from tasks to generate the final response.\n"
" (b) if the answer cannot be determined in the planning phase before you execute the plans. "
)
To ensure that join
waits for the required inputs before executing, you need to make sure that the plan generated by the LLM adheres to the guidelines provided in the generate_llm_compiler_prompt
function, specifically:
" - Always call join as the last action in the plan. Say '{END_OF_PLAN}' after you call join\n"
If join
is appearing too early, it might be due to an issue in the plan generation logic or the LLM's interpretation of the prompt. You should verify that the prompt provided to the LLM correctly emphasizes that join
should be the final action.
Additionally, the get_next_module_keys
function in the QueryPipeline
class determines the next module keys to run based on the current state. It ensures that only modules with all required inputs available and not yet executed are returned as the next modules to run:
def get_next_module_keys(self, run_state: RunState) -> List[str]:
"""Determine the next module keys to run based on the current state."""
next_module_keys = []
for module_key, module_input in run_state.all_module_inputs.items():
if module_key in run_state.executed_modules:
continue # Module already executed
if all(
key in module_input
for key in self.module_dict[module_key].free_req_input_keys
):
next_module_keys.append(module_key)
return next_module_keys
This function ensures that join
will only be executed when all its required inputs are available, thus preventing it from being executed prematurely [1][2][3].
To continue talking to Dosu, mention @dosu.
@dosu, I am zeroing in. In RunState, I get the following:
RunState, 2, list(self.all_module_inputs.items())=[('input', {'query_str': 'Hello!', 'chat_history': [], 'chat_history_str': ''}), ('rewrite_template', {}), ('llm', {}), ('rewrite_retriever', {}), ('query_retriever', {}), ('join', {}), ('reranker', {}), ('response_component', {})]
Here is my RunState with debug statements:
class RunState:
def __init__(
self,
module_dict: Dict[str, QueryComponent],
module_input_dict: Dict[str, Dict[str, Any]],
):
print(f"GE, RunState, {list(module_dict.keys())=}")
self.all_module_inputs: Dict[str, Dict[str, Any]] = {
module_key: {} for module_key in module_dict
}
print(f"GE, RunState, 1, {list(self.all_module_inputs.items())=}")
for module_key, input_dict in module_input_dict.items():
self.all_module_inputs[module_key] = input_dict
print(f"GE, RunState, 2, {list(self.all_module_inputs.items())=}")
self.module_dict = module_dict
self.result_outputs: Dict[str, Any] = {}
self.intermediate_outputs: Dict[str, ComponentIntermediates] = {}
self.executed_modules: Set[str] = set()
The debug output you provided from the RunState
initialization shows that the all_module_inputs
dictionary is being correctly populated with the initial inputs for the input
module, while other modules have empty input dictionaries:
RunState, 2, list(self.all_module_inputs.items())=[
('input', {'query_str': 'Hello!', 'chat_history': [], 'chat_history_str': ''}),
('rewrite_template', {}),
('llm', {}),
('rewrite_retriever', {}),
('query_retriever', {}),
('join', {}),
('reranker', {}),
('response_component', {})
]
This indicates that the input
module has its required inputs, but other modules, including join
, do not have their inputs yet.
To ensure that the join
module waits for its required inputs before executing, you need to verify that the dependencies are correctly set up and that the pipeline's state management correctly handles these dependencies.
Check Module Dependencies:
join
module has the correct dependencies and that these dependencies are being respected during execution.Add Detailed Logging:
Here’s an example of how you might add detailed logging to trace the execution flow and ensure that the join
module waits for its required inputs:
def get_next_module_keys(self, run_state: RunState) -> List[str]:
"""Determine the next module keys to run based on the current state."""
next_module_keys = []
for module_key, module_input in run_state.all_module_inputs.items():
if module_key in run_state.executed_modules:
continue # Module already executed
required_keys = self.module_dict[module_key].free_req_input_keys
if all(key in module_input for key in required_keys):
next_module_keys.append(module_key)
print(f"GE: return {next_module_keys=}")
return next_module_keys
def _run_multi(self, module_input_dict: Dict[str, Any], show_intermediates=False) -> Tuple[Dict[str, Any], Dict[str, ComponentIntermediates]]:
self._validate_inputs(module_input_dict)
run_state = self.get_run_state(module_input_dict)
next_module_keys = self.get_next_module_keys(run_state)
while True:
for module_key in next_module_keys:
module = run_state.module_dict[module_key]
module_input = run_state.all_module_inputs[module_key]
if self.verbose:
print_debug_input(module_key, module_input)
output_dict = module.run_component(**module_input)
print(f"Module {module_key} output: {output_dict}")
if show_intermediates and module_key not in run_state.intermediate_outputs:
run_state.intermediate_outputs[module_key] = ComponentIntermediates(inputs=module_input, outputs=output_dict)
self.process_component_output(output_dict, module_key, run_state)
next_module_keys = self.get_next_module_keys(run_state)
if not next_module_keys:
if len(run_state.result_outputs) == 0:
run_state.result_outputs[module_key] = output_dict
else:
raise ValueError("Multiple outputs detected in the final state.")
break
return run_state.result_outputs, run_state.intermediate_outputs
print_debug_input
and print
statements help trace the execution flow and verify that the dependencies are being respected.join
module only runs when it has received inputs from both rewrite_retriever
and query_retriever
.By adding detailed logging and verifying the dependencies, you can ensure that the join
module waits for the required inputs before running. This approach helps debug and resolve issues related to premature execution in the QueryPipeline
[1][2][3].
To continue talking to Dosu, mention @dosu.
@dosu, doesn't 'join' require the output of rewrite_retriever
and query_retriever
to be available? Shouldn't that be specified somewhere? Where is the output of rewrite_retriever
and query_retriever
specified? How would join
known that these outputs are required, and so it must wait for the outputs before executing?
Yes, the join
module does require the outputs of rewrite_retriever
and query_retriever
to be available before it can execute. This dependency should be specified in the pipeline configuration using the add_link
method. The add_link
method defines the dependencies between modules, ensuring that a module only runs when all its required inputs are available.
To ensure that the join
module waits for the outputs of rewrite_retriever
and query_retriever
, you need to specify these dependencies explicitly in the pipeline configuration. Here’s how you can do that:
Define the Pipeline:
pipeline = QueryPipeline(
modules={
"input": input_component,
"rewrite_template": rewrite_template,
"llm": llm_c,
"rewrite_retriever": retriever1,
"query_retriever": retriever2,
"join": argpack_component,
"reranker": reranker,
"response_component": response_component,
},
verbose=True,
)
Add Links to Ensure Dependencies:
pipeline.add_link("input", "rewrite_template", src_key="query_str", dest_key="query_str")
pipeline.add_link("input", "rewrite_template", src_key="chat_history_str", dest_key="chat_history_str")
pipeline.add_link("rewrite_template", "llm")
pipeline.add_link("llm", "rewrite_retriever")
pipeline.add_link("input", "query_retriever", src_key="query_str")
# Ensure join waits for both retrievers
pipeline.add_link("rewrite_retriever", "join", dest_key="rewrite_nodes")
pipeline.add_link("query_retriever", "join", dest_key="query_nodes")
pipeline.add_link("join", "reranker", dest_key="nodes")
pipeline.add_link("input", "reranker", src_key="query_str", dest_key="query_str")
pipeline.add_link("reranker", "response_component", dest_key="nodes")
pipeline.add_link("input", "response_component", src_key="query_str", dest_key="query_str")
pipeline.add_link("input", "response_component", src_key="chat_history", dest_key="chat_history")
join
Knows to WaitThe join
module knows to wait for the outputs of rewrite_retriever
and query_retriever
because of the dependencies specified using the add_link
method. The add_link
method creates a directed edge in the pipeline's dependency graph, indicating that the join
module depends on the outputs of rewrite_retriever
and query_retriever
.
Here’s an example of how the get_next_module_keys
method ensures that the join
module only runs when its required inputs are available:
def get_next_module_keys(self, run_state: RunState) -> List[str]:
"""Determine the next module keys to run based on the current state."""
next_module_keys = []
for module_key, module_input in run_state.all_module_inputs.items():
if module_key in run_state.executed_modules:
continue # Module already executed
required_keys = self.module_dict[module_key].free_req_input_keys
if all(key in module_input for key in required_keys):
next_module_keys.append(module_key)
return next_module_keys
To trace the execution flow and ensure that the join
module waits for its required inputs, you can add logging statements:
def _run_multi(self, module_input_dict: Dict[str, Any], show_intermediates=False) -> Tuple[Dict[str, Any], Dict[str, ComponentIntermediates]]:
self._validate_inputs(module_input_dict)
run_state = self.get_run_state(module_input_dict)
next_module_keys = self.get_next_module_keys(run_state)
while True:
for module_key in next_module_keys:
module = run_state.module_dict[module_key]
module_input = run_state.all_module_inputs[module_key]
if self.verbose:
print_debug_input(module_key, module_input)
output_dict = module.run_component(**module_input)
print(f"Module {module_key} output: {output_dict}")
if show_intermediates and module_key not in run_state.intermediate_outputs:
run_state.intermediate_outputs[module_key] = ComponentIntermediates(inputs=module_input, outputs=output_dict)
self.process_component_output(output_dict, module_key, run_state)
next_module_keys = self.get_next_module_keys(run_state)
if not next_module_keys:
if len(run_state.result_outputs) == 0:
run_state.result_outputs[module_key] = output_dict
else:
raise ValueError("Multiple outputs detected in the final state.")
break
return run_state.result_outputs, run_state.intermediate_outputs
By ensuring that the dependencies are correctly specified and adding detailed logging, you can verify that the join
module waits for the required inputs before executing, preventing issues related to premature execution in the QueryPipeline
[1][2].
To continue talking to Dosu, mention @dosu.
Question Validation
Question
I am playing with the example:
query_pipeline_memory.ipynb
notebook. There is an error in the cell:Specifically,
In english:
response.message
is not found. That is very hard to believe. I am running LLamaindex version 0.10.38. I also include mypyproject.toml
file:I don't understand what is wrong. Any help is appreciated. Thanks.