run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.24k stars 5.17k forks source link

Query Pipeline generating random links #14877

Closed arnavroh45 closed 3 months ago

arnavroh45 commented 3 months ago

Bug Description

I am using query pipeline by llama_index My pipeline looks like this, but for some reason, the input and join modules are getting linked. What is the reason behind it and this is affecting my pipeline.

pipeline.add_link("input", "source1") pipeline.add_link("input", "source1") pipeline.add_link("input","source2")

pipeline.add_link("source1", "source3")

pipeline.add_link("source3", "source4")

pipeline.add_link("source2", "join", dest_key="1")

pipeline.add_link("source4", "join", dest_key="2")

Version

0.10.29

Steps to Reproduce

pipeline.add_link("input", "source1") pipeline.add_link("input", "source1") pipeline.add_link("input","source2")

pipeline.add_link("source1", "spurce3")

pipeline.add_link("source3", "source4")

pipeline.add_link("source2", "join", dest_key="1")

pipeline.add_link("source4", "join", dest_key="2")

Relevant Logs/Tracbacks

> Running module join with input:
dosubot[bot] commented 3 months ago

The issue of generating random links in your pipeline configuration, causing the input and join modules to get linked incorrectly, could be due to the incorrect or missing setup of links between the modules. Specifically, the setup_links method in the PipelineConfigurator class is responsible for defining these connections.

To fix this issue, ensure that the setup_links method correctly defines the connections between the modules. Here is the relevant part of the code that sets up the links:

def setup_links(self) -> None:
    """Defines the connections (links) between the different pipeline modules."""
    # STAGE-1: SELECT subset of reasoning Modules.
    self.pipeline.add_link(
        "input", "select_prompt_template", src_key="task", dest_key="task"
    )
    self.pipeline.add_link(
        "input",
        "select_prompt_template",
        src_key="reasoning_modules",
        dest_key="reasoning_modules",
    )
    self.pipeline.add_link("select_prompt_template", "select_llm")

    # STAGE-1: ADAPT selected reasoning modules to the task.
    self.pipeline.add_link(
        "select_llm", "adapt_prompt_template", dest_key="selected_modules"
    )
    self.pipeline.add_link(
        "input", "adapt_prompt_template", src_key="task", dest_key="task"
    )
    self.pipeline.add_link("adapt_prompt_template", "adapt_llm")

    # STAGE-1: IMPLEMENT provides reasoning structure for the task.
    self.pipeline.add_link(
        "adapt_llm", "implement_prompt_template", dest_key="adapted_modules"
    )
    self.pipeline.add_link(
        "input", "implement_prompt_template", src_key="task", dest_key="task"
    )
    self.pipeline.add_link("implement_prompt_template", "implement_llm")

    # STAGE-2: Uses the generated reasoning structure for the task to generate an answer.
    self.pipeline.add_link(
        "implement_llm", "reasoning_prompt_template", dest_key="reasoning_structure"
    )
    self.pipeline.add_link(
        "input", "reasoning_prompt_template", src_key="task", dest_key="task"
    )
    self.pipeline.add_link("reasoning_prompt_template", "reasoning_llm")

Ensure that each add_link call correctly specifies the source and destination keys. Any incorrect or missing links can cause the modules to be linked incorrectly, leading to the issue you are experiencing.

Additionally, you can refer to the QueryPipeline class from the LlamaIndex core for an example of how to properly set up and manage the links in a query pipeline:

class QueryPipeline(QueryComponent):
    """A query pipeline that can allow arbitrary chaining of different modules.

    A pipeline itself is a query component, and can be used as a module in another pipeline.
    """

    def __init__(
        self,
        callback_manager: Optional[CallbackManager] = None,
        chain: Optional[Sequence[CHAIN_COMPONENT_TYPE]] = None,
        modules: Optional[Dict[str, QUERY_COMPONENT_TYPE]] = None,
        links: Optional[List[Link]] = None,
        **kwargs: Any,
    ):
        super().__init__(
            callback_manager=callback_manager or CallbackManager([]),
            **kwargs,
        )
        self._init_graph(chain=chain, modules=modules, links=links)

    def _init_graph(
        self,
        chain: Optional[Sequence[CHAIN_COMPONENT_TYPE]] = None,
        modules: Optional[Dict[str, QUERY_COMPONENT_TYPE]] = None,
        links: Optional[List[Link]] = None,
    ) -> None:
        """Initialize graph."""
        if chain is not None:
            if modules is not None or links is not None:
                raise ValueError("Cannot specify both chain and modules/links in init.")
            self.add_chain(chain)
        elif modules is not None:
            self.add_modules(modules)
            if links is not None:
                for link in links:
                    self.add_link(**link.dict())

    def add_chain(self, chain: Sequence[CHAIN_COMPONENT_TYPE]) -> None:
        """Add a chain of modules to the pipeline."""
        module_keys: List[str] = []
        for module in chain:
            if isinstance(module, get_args(QUERY_COMPONENT_TYPE)):
                module_key = str(uuid.uuid4())
                self.add(module_key, cast(QUERY_COMPONENT_TYPE, module))
                module_keys.append(module_key)
            elif isinstance(module, str):
                module_keys.append(module)
            else:
                raise ValueError("Chain must be a sequence of modules or module keys.")
        for i in range(len(chain) - 1):
            self.add_link(src=module_keys[i], dest=module_keys[i + 1])

    def add_links(self, links: List[Link]) -> None:
        """Add links to the pipeline."""
        for link in links:
            if isinstance(link, Link):
                self.add_link(**link.dict())
            else:
                raise ValueError("Link must be of type `Link` or `ConditionalLinks`.")

    def add_modules(self, module_dict: Dict[str, QUERY_COMPONENT_TYPE]) -> None:
        """Add modules to the pipeline."""
        for module_key, module in module_dict.items():
            self.add(module_key, module)

    def add(self, module_key: str, module: QUERY_COMPONENT_TYPE) -> None:
        """Add a module to the pipeline."""
        if module_key in self.module_dict:
            raise ValueError(f"Module {module_key} already exists in pipeline.")
        if isinstance(module, ChainableMixin):
            module = module.as_query_component()
        self.module_dict[module_key] = cast(QueryComponent, module)
        self.dag.add_node(module_key)

    def add_link(
        self,
        src: str,
        dest: str,
        src_key: Optional[str] = None,
        dest_key: Optional[str] = None,
        condition_fn: Optional[Callable] = None,
        input_fn: Optional[Callable] = None,
    ) -> None:
        """Add a link between two modules."""
        if src not in self.module_dict:
            raise ValueError(f"Module {src} does not exist in pipeline.")
        self.dag.add_edge(
            src,
            dest,
            src_key=src_key,
            dest_key=dest_key,
            condition_fn=condition_fn,
            input_fn=input_fn,
        )

This example demonstrates how to initialize a query pipeline with modules and links, and how to manage the links to ensure they are set up correctly [1][2][3].

To continue talking to Dosu, mention @dosu.

arnavroh45 commented 3 months ago

i want to know if I haven't set up a link between "input" and "join", then why are they running together? This is what I am getting, when I Keep the verbose True: > Running module join with input:

dosubot[bot] commented 3 months ago

The "input" and "join" modules are running together in your llama_index pipeline configuration because the QueryPipeline class allows for arbitrary chaining of different modules. This chaining can result in modules running together if they are linked in the pipeline's directed acyclic graph (DAG).

The QueryPipeline class initializes the graph and adds modules and links between them. If the "input" and "join" modules are part of the same chain or have links connecting them, they will run together.

Here is the relevant code that sets up these links:

def _init_graph(
        self,
        chain: Optional[Sequence[CHAIN_COMPONENT_TYPE]] = None,
        modules: Optional[Dict[str, QUERY_COMPONENT_TYPE]] = None,
        links: Optional[List[Link]] = None,
    ) -> None:
        """Initialize graph."""
        if chain is not None:
            if modules is not None or links is not None:
                raise ValueError("Cannot specify both chain and modules/links in init.")
            self.add_chain(chain)
        elif modules is not None:
            self.add_modules(modules)
            if links is not None:
                for link in links:
                    self.add_link(**link.dict())

def add_link(
        self,
        src: str,
        dest: str,
        src_key: Optional[str] = None,
        dest_key: Optional[str] = None,
        condition_fn: Optional[Callable] = None,
        input_fn: Optional[Callable] = None,
    ) -> None:
        """Add a link between two modules."""
        if src not in self.module_dict:
            raise ValueError(f"Module {src} does not exist in pipeline.")
        self.dag.add_edge(
            src,
            dest,
            src_key=src_key,
            dest_key=dest_key,
            condition_fn=condition_fn,
            input_fn=input_fn,
        )

Additionally, the QueryPipeline class includes methods to handle the inputs and outputs of modules, ensuring that the outputs of one module can be used as inputs to another. This is managed through the get_output and add_output_to_module_inputs functions, which facilitate the flow of data between modules:

def get_output(
    src_key: Optional[str],
    output_dict: Dict[str, Any],
) -> Any:
    """Add input to module deps inputs."""
    if src_key is None:
        if len(output_dict) != 1:
            raise ValueError("Output dict must have exactly one key.")
        output = next(iter(output_dict.values()))
    else:
        output = output_dict[src_key]
    return output

def add_output_to_module_inputs(
    dest_key: str,
    output: Any,
    module: QueryComponent,
    module_inputs: Dict[str, Any],
) -> None:
    """Add input to module deps inputs."""
    if dest_key is None:
        free_keys = module.free_req_input_keys
        if len(free_keys) != 1:
            raise ValueError(
                "Module input keys must have exactly one key if "
                "dest_key is not specified. Remaining keys: "
                f"in module: {free_keys}"
            )
        module_inputs[next(iter(free_keys))] = output
    else:
        module_inputs[dest_key] = output

These methods ensure that the modules are connected in the pipeline's DAG, leading to the observed behavior of the "input" and "join" modules running together [1][2][3].

To continue talking to Dosu, mention @dosu.

logan-markewich commented 3 months ago

@arnavroh45 that log doesn't mean that the input is connected to join. It's saying it's running the join component with some input data -- not specifically the input component

arnavroh45 commented 3 months ago

@logan-markewich If i print the module keys after the run state, I am getting ['input', 'join'] as the output, why is that? I want only "input" to be the next module after the run state and I want "join" to come later into the picture, I don't think there is a problem in the process of addition of links.

This is the code that I ran to print the module keys run_state = pipeline.get_run_state(input) next_module_keys = pipeline.get_next_module_keys(run_state) print(next_module_keys)