run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
35.85k stars 5.09k forks source link

[Question]: Can we apply the Retry Agent for the Query Time Sample Row Retrieval with chunk based indexing of tables. #13722

Open Mathin21 opened 4 months ago

Mathin21 commented 4 months ago

Question Validation

Question

Can we apply the Retry Agent for the Query Time Sample Row Retrieval with chunk based indexing of tables.

The documentation link for the Query Time Sample Row Retrieval is => https://docs.llamaindex.ai/en/stable/examples/pipeline/query_pipeline_sql/?h=advanced+row

The Retry Agent is build with the Query Time Sample Row Retrieval with chunk based indexing of tables for the Question and Anwering Bot. Basically, the Query Time Sample Row Retrieval with chunk based indexing of tables consists of following steps as qp = QP( modules={ "input": InputComponent(), "table_retriever": obj_retriever, "table_output_parser": table_parser_component, "text2sql_prompt": text2sql_prompt, "text2sql_llm": llm, "sql_output_parser": sql_parser_component, "sql_retriever": sql_retriever, "response_synthesis_prompt": response_synthesis_prompt, "response_synthesis_llm": llm, }, verbose=True, )

The documentation link for the Simple Retry Agent is => https://docs.llamaindex.ai/en/stable/examples/agent/agent_runner/query_pipeline_agent/?h=agent+around

The retry agent is build based on the overwritten of modules in the query pipeline of Query Time Sample Row Retrieval as follows as

Define the query pipeline with retry agent

qp = QP( modules={ "input": agent_input_component, "table_retriever": obj_retriever, "table_output_parser": table_parser_component, "text2sql_prompt": text2sql_prompt, "text2sql_llm": llm, "retry_prompt": retry_prompt, "sql_output_parser": sql_parser_component, "sql_retriever": sql_retriever, "response_synthesis_prompt": response_synthesis_prompt, "response_synthesis_llm": llm, "output_component": agent_output_component, }, verbose=True, )

Add links and chains for the query pipeline

qp.add_link("input", "retry_prompt", src_key="input", dest_key="input") qp.add_link( "input", "retry_prompt", src_key="convo_history", dest_key="convo_history" ) qp.add_link("input", "table_retriever") qp.add_link("input", "table_output_parser", dest_key="query_str") qp.add_link( "table_retriever", "table_output_parser", dest_key="table_schema_objs" ) qp.add_link("table_output_parser", "text2sql_prompt", dest_key="schema") qp.add_chain( ["text2sql_prompt", "text2sql_llm", "sql_output_parser", "sql_retriever"] ) qp.add_link( "sql_output_parser", "response_synthesis_prompt", dest_key="sql_query" ) qp.add_link( "sql_retriever", "response_synthesis_prompt", dest_key="context_str" ) qp.add_link("input", "response_synthesis_prompt", dest_key="query_str") qp.add_link("response_synthesis_prompt", "response_synthesis_llm") qp.add_link("response_synthesis_llm", "output_component")

qp.add_chain( [ "retry_prompt", "sql_output_parser", "sql_retriever", "response_synthesis_prompt", "response_synthesis_llm", "agent_output_component", ] )

While running the Agent it results in error how to overcome this problem

Create the agent worker and runner

agent_worker = QueryPipelineAgentWorker(qp)

agent1 = AgentRunner(

agent_worker, callback_manager=CallbackManager(), verbose=False

)

response = agent1.chat(

"How many states are there in india?"

)

print(str(response))

Running module input with input: state: {'sources': [], 'memory': ChatMemoryBuffer(token_limit=3000, tokenizer_fn=functools.partial(<bound method Encoding.encode of <Encoding 'cl100k_base'>>, allowed_special='all'), chat_store=SimpleChatStore(store={}), chat_store_key='chat_history')} task: task_id='d6f08a76-3251-4df6-bb6c-934ce9c5a1f0' input='How many states are there in india?' memory=ChatMemoryBuffer(token_limit=3000, tokenizer_fn=functools.partial(<bound method Encoding.encode of <Encoding 'cl100k_base'>>, allowed_special='all'), chat_store=SimpleChatStore(store={}), chat_store_key='chat_history') callback_manager=<llama_index.core.callbacks.base.CallbackManager object at 0x7f7171cd0190> extra_state={}


ValueError Traceback (most recent call last) Cell In[89], line 1 ----> 1 response = agent1.chat( 2 "How many states are there in india?" 3 ) 4 print(str(response))

File /opt/conda/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py:274, in Dispatcher.span..wrapper(func, instance, args, kwargs) 270 self.spanenter( 271 id=id_, bound_args=bound_args, instance=instance, parent_id=parent_id 272 ) 273 try: --> 274 result = func(*args, **kwargs) 275 except BaseException as e: 276 self.event(SpanDropEvent(spanid=id, err_str=str(e)))

File /opt/conda/lib/python3.10/site-packages/llama_index/core/callbacks/utils.py:41, in trace_method..decorator..wrapper(self, *args, *kwargs) 39 callback_manager = cast(CallbackManager, callback_manager) 40 with callback_manager.as_trace(trace_id): ---> 41 return func(self, args, **kwargs)

File /opt/conda/lib/python3.10/site-packages/llama_index/core/agent/runner/base.py:644, in AgentRunner.chat(self, message, chat_history, tool_choice) 639 tool_choice = self.default_tool_choice 640 with self.callback_manager.event( 641 CBEventType.AGENT_STEP, 642 payload={EventPayload.MESSAGES: [message]}, 643 ) as e: --> 644 chat_response = self._chat( 645 message=message, 646 chat_history=chat_history, 647 tool_choice=tool_choice, 648 mode=ChatResponseMode.WAIT, 649 ) 650 assert isinstance(chat_response, AgentChatResponse) 651 e.on_end(payload={EventPayload.RESPONSE: chat_response})

File /opt/conda/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py:274, in Dispatcher.span..wrapper(func, instance, args, kwargs) 270 self.spanenter( 271 id=id_, bound_args=bound_args, instance=instance, parent_id=parent_id 272 ) 273 try: --> 274 result = func(*args, **kwargs) 275 except BaseException as e: 276 self.event(SpanDropEvent(spanid=id, err_str=str(e)))

File /opt/conda/lib/python3.10/site-packages/llama_index/core/agent/runner/base.py:574, in AgentRunner._chat(self, message, chat_history, tool_choice, mode) 571 dispatch_event(AgentChatWithStepStartEvent(user_msg=message)) 572 while True: 573 # pass step queue in as argument, assume step executor is stateless --> 574 cur_step_output = self._run_step( 575 task.task_id, mode=mode, tool_choice=tool_choice 576 ) 578 if cur_step_output.is_last: 579 result_output = cur_step_output

File /opt/conda/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py:274, in Dispatcher.span..wrapper(func, instance, args, kwargs) 270 self.spanenter( 271 id=id_, bound_args=bound_args, instance=instance, parent_id=parent_id 272 ) 273 try: --> 274 result = func(*args, **kwargs) 275 except BaseException as e: 276 self.event(SpanDropEvent(spanid=id, err_str=str(e)))

File /opt/conda/lib/python3.10/site-packages/llama_index/core/agent/runner/base.py:411, in AgentRunner._run_step(self, task_id, step, input, mode, kwargs) 407 # TODO: figure out if you can dynamically swap in different step executors 408 # not clear when you would do that by theoretically possible 410 if mode == ChatResponseMode.WAIT: --> 411 cur_step_output = self.agent_worker.run_step(step, task, kwargs) 412 elif mode == ChatResponseMode.STREAM: 413 cur_step_output = self.agent_worker.stream_step(step, task, **kwargs)

File /opt/conda/lib/python3.10/site-packages/llama_index/core/callbacks/utils.py:41, in trace_method..decorator..wrapper(self, *args, *kwargs) 39 callback_manager = cast(CallbackManager, callback_manager) 40 with callback_manager.as_trace(trace_id): ---> 41 return func(self, args, **kwargs)

File /opt/conda/lib/python3.10/site-packages/llama_index/core/agent/custom/pipeline_worker.py:154, in QueryPipelineAgentWorker.run_step(self, step, task, **kwargs) 151 for agent_fn_component in self.agent_components: 152 agent_fn_component.partial(task=task, state=step.step_state) --> 154 agent_response, is_done = self.pipeline.run(state=step.step_state, task=task) 155 response = self._get_task_step_response(agent_response, step, is_done) 156 # sync step state with task state

File /opt/conda/lib/python3.10/site-packages/llama_index/core/query_pipeline/query.py:321, in QueryPipeline.run(self, return_values_direct, callback_manager, batch, *args, *kwargs) 317 query_payload = json.dumps(str(kwargs)) 318 with self.callback_manager.event( 319 CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_payload} 320 ) as queryevent: --> 321 outputs, = self._run( 322 args, 323 return_values_direct=return_values_direct, 324 show_intermediates=False, 325 batch=batch, 326 **kwargs, 327 ) 329 return outputs

File /opt/conda/lib/python3.10/site-packages/llama_index/core/query_pipeline/query.py:673, in QueryPipeline._run(self, return_values_direct, show_intermediates, batch, *args, **kwargs) 671 return result_outputs, intermediates 672 else: --> 673 result_outputs, intermediates = self._run_multi( 674 {root_key: kwargs}, show_intermediates=show_intermediates 675 ) 677 return ( 678 self._get_single_result_output(result_outputs, return_values_direct), 679 intermediates, 680 )

File /opt/conda/lib/python3.10/site-packages/llama_index/core/query_pipeline/query.py:884, in QueryPipeline._run_multi(self, module_input_dict, show_intermediates) 879 intermediate_outputs[module_key] = ComponentIntermediates( 880 inputs=module_input, outputs=output_dict 881 ) 883 # get new nodes and is_leaf --> 884 queue = self._process_component_output( 885 queue, output_dict, module_key, all_module_inputs, result_outputs 886 ) 888 return result_outputs, intermediate_outputs

File /opt/conda/lib/python3.10/site-packages/llama_index/core/query_pipeline/query.py:769, in QueryPipeline._process_component_output(self, queue, output_dict, module_key, all_module_inputs, result_outputs) 767 # everything not in conditional_edgelist is regular 768 for , dest, attr in edge_list: --> 769 output = get_output(attr.get("src_key"), output_dict) 771 # if input_fn is not None, use it to modify the input 772 if attr["input_fn"] is not None:

File /opt/conda/lib/python3.10/site-packages/llama_index/core/query_pipeline/query.py:45, in get_output(src_key, output_dict) 42 if src_key is None: 43 # ensure that output_dict only has one key 44 if len(output_dict) != 1: ---> 45 raise ValueError("Output dict must have exactly one key.") 46 output = next(iter(output_dict.values())) 47 else:

ValueError: Output dict must have exactly one key.

dosubot[bot] commented 4 months ago

Hey @Mathin21! I'm here to help you out with any bugs, questions, or contributions you have in mind. Let's tackle this together!

To overcome the "Output dict must have exactly one key" error when applying the Retry Agent for the Query Time Sample Row Retrieval with chunk-based indexing of tables, ensure that the output dictionary from your function or method contains exactly one key. This error typically occurs when the output dictionary has either no keys or more than one key.

Here is an example of how you can structure your function to ensure the output dictionary has exactly one key:

def retrieve_sample_row(query):
    # Your logic to retrieve the sample row based on the query
    sample_row = {"row_data": "sample data"}  # Example output

    # Ensure the output dictionary has exactly one key
    if len(sample_row) != 1:
        raise ValueError("Output dict must have exactly one key")

    return sample_row

Additionally, you can refer to the get_output function from the LlamaIndex library, which ensures that the output dictionary has exactly one key:

def get_output(
    src_key: Optional[str],
    output_dict: Dict[str, Any],
) -> Any:
    """Add input to module deps inputs."""
    # get relevant output from link
    if src_key is None:
        # ensure that output_dict only has one key
        if len(output_dict) != 1:
            raise ValueError("Output dict must have exactly one key.")
        output = next(iter(output_dict.values()))
    else:
        output = output_dict[src_key]
    return output

This function checks if the output_dict has exactly one key and raises a ValueError if it does not. You can incorporate similar logic into your code to handle the output dictionary validation [1][2].

#### Details **About Dosu** This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

Mathin21 commented 4 months ago

Where in the query pipeline modules it raising the multiple outputs, it can`t be possible to identify the output.

dosubot[bot] commented 4 months ago

To resolve the "Output dict must have exactly one key" error in your query pipeline setup, you need to ensure that each component in the pipeline returns a dictionary with exactly one key. This error typically occurs when a component's output dictionary has either no keys or more than one key.

Here are the steps to identify and fix the issue:

  1. Check Each Component's Output: Ensure that each component in your query pipeline returns a dictionary with exactly one key. For example, if you have a component that processes data, make sure its output is structured correctly.

  2. Modify _run_component and _arun_component Methods: In your AgentFnComponent class, modify the _run_component and _arun_component methods to return a dictionary with a single key, typically named "output".

Here is an example of how to structure these methods:

def _run_component(self, **kwargs: Any) -> Dict:
    """Run component."""
    output = self.fn(**kwargs)
    # Ensure the output is a dictionary with exactly one key
    return {"output": output}

async def _arun_component(self, **kwargs: Any) -> Any:
    """Run component (async)."""
    if self.async_fn is None:
        return self._run_component(**kwargs)
    else:
        output = await self.async_fn(**kwargs)
        # Ensure the output is a dictionary with exactly one key
        return {"output": output}
  1. Validate Output Keys: Ensure that the OutputKeys class in your pipeline schema is correctly validating the output keys. The OutputKeys class should be used to define the required output keys and validate them:
class OutputKeys(BaseModel):
    """Output keys."""

    required_keys: Set[str] = Field(default_factory=set)

    @classmethod
    def from_keys(
        cls,
        required_keys: Set[str],
    ) -> "OutputKeys":
        """Create OutputKeys from tuple."""
        return cls(required_keys=required_keys)

    def validate(self, input_keys: Set[str]) -> None:
        """Validate input keys."""
        # validate that input keys exactly match required keys
        if input_keys != self.required_keys:
            raise ValueError(
                f"Input keys {input_keys} do not match required keys {self.required_keys}"
            )
  1. Check the get_output Function: The get_output function in the QueryPipeline class ensures that the output dictionary has a single key named "output". Make sure this function is used correctly in your pipeline:
def get_output(
    src_key: Optional[str],
    output_dict: Dict[str, Any],
) -> Any:
    """Add input to module deps inputs."""
    # get relevant output from link
    if src_key is None:
        # ensure that output_dict only has one key
        if len(output_dict) != 1:
            raise ValueError("Output dict must have exactly one key.")
        output = next(iter(output_dict.values()))
    else:
        output = output_dict[src_key]
    return output

By following these steps, you can ensure that your query pipeline components return the correct output format, thereby resolving the "Output dict must have exactly one key" error [1][2][3][4][5].

#### Details **About Dosu** This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

Mathin21 commented 4 months ago

@dosu. we can`t override this get_output function because, there is need of multiple requirements for the each module during the execution of queries.

deybelman commented 2 months ago

@Mathin21 Did you ever manage to figure this out? I'm struggling with the exact same problem.

dosubot[bot] commented 2 months ago

To address the "Output dict must have exactly one key" error without overriding the get_output function, you need to ensure that each component in your query pipeline returns a dictionary with exactly one key. This can be challenging if multiple outputs are required for each module. Here are some strategies to handle this:

  1. Wrap Multiple Outputs in a Single Dictionary: Instead of returning multiple keys directly, wrap them in a single dictionary under one key. For example:

    def _run_component(self, **kwargs: Any) -> Dict:
        """Run component."""
        output1, output2 = self.fn(**kwargs)
        # Wrap multiple outputs in a single dictionary
        return {"output": {"output1": output1, "output2": output2}}
    
    async def _arun_component(self, **kwargs: Any) -> Any:
        """Run component (async)."""
        if self.async_fn is None:
            return self._run_component(**kwargs)
        else:
            output1, output2 = await self.async_fn(**kwargs)
            # Wrap multiple outputs in a single dictionary
            return {"output": {"output1": output1, "output2": output2}}
  2. Unpack the Wrapped Outputs in Subsequent Components: When a subsequent component needs to access these outputs, unpack them from the single dictionary:

    def next_component_fn(self, **kwargs: Any) -> Any:
        wrapped_output = kwargs.get("output")
        output1 = wrapped_output["output1"]
        output2 = wrapped_output["output2"]
        # Process the outputs as needed
  3. Ensure Consistent Output Structure: Make sure that all components in the pipeline follow this pattern to maintain consistency and avoid the "Output dict must have exactly one key" error.

By wrapping multiple outputs in a single dictionary and unpacking them as needed, you can manage the flow of multiple outputs through the query pipeline without modifying the get_output function.

Here is an example of how you can structure your query pipeline components:

# Define the query pipeline with retry agent
qp = QP(
    modules={
        "input": agent_input_component,
        "table_retriever": obj_retriever,
        "table_output_parser": table_parser_component,
        "text2sql_prompt": text2sql_prompt,
        "text2sql_llm": llm,
        "retry_prompt": retry_prompt,
        "sql_output_parser": sql_parser_component,
        "sql_retriever": sql_retriever,
        "response_synthesis_prompt": response_synthesis_prompt,
        "response_synthesis_llm": llm,
        "output_component": agent_output_component,
    },
    verbose=True,
)

# Add links and chains for the query pipeline
qp.add_link("input", "retry_prompt", src_key="input", dest_key="input")
qp.add_link("input", "retry_prompt", src_key="convo_history", dest_key="convo_history")
qp.add_link("input", "table_retriever")
qp.add_link("input", "table_output_parser", dest_key="query_str")
qp.add_link("table_retriever", "table_output_parser", dest_key="table_schema_objs")
qp.add_link("table_output_parser", "text2sql_prompt", dest_key="schema")
qp.add_chain(["text2sql_prompt", "text2sql_llm", "sql_output_parser", "sql_retriever"])
qp.add_link("sql_output_parser", "response_synthesis_prompt", dest_key="sql_query")
qp.add_link("sql_retriever", "response_synthesis_prompt", dest_key="context_str")
qp.add_link("input", "response_synthesis_prompt", dest_key="query_str")
qp.add_link("response_synthesis_prompt", "response_synthesis_llm")
qp.add_link("response_synthesis_llm", "output_component")

qp.add_chain([
    "retry_prompt",
    "sql_output_parser",
    "sql_retriever",
    "response_synthesis_prompt",
    "response_synthesis_llm",
    "agent_output_component",
])

By following this approach, you can handle multiple outputs within the constraints of the get_output function and ensure smooth execution of your query pipeline [1][2][3][4][5].

To continue talking to Dosu, mention @dosu.