run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
35.79k stars 5.06k forks source link

[Bug]: pipeline arun does not parallelize query pipeline #14144

Closed ootkin closed 1 week ago

ootkin commented 3 months ago

Bug Description

I'm trying to optimize my query pipeline with async/parallel call using pipeline.arun.

However, if I look at my trace in langfuse, it seems like there is no parallel calls: image

Version

0.10.43

Steps to Reproduce

retriever_1 = index_1.as_retriever()
retriever_2 = index_2.as_retriever()
synthesizer = get_response_synthesizer()
reranker_1 = RankGPTRerank(llm=OpenAI(model='gpt-3.5-turbo'), top_n=3)
reranker_2 = RankGPTRerank(llm=OpenAI(model='gpt-3.5-turbo'), top_n=3)
join = ArgPackComponent()

p = QueryPipeline()
p.add_modules({
    "input": InputCompontent(),
    "retriever_1": retriever_1,
    "retriever_2": retriever_2,
    "reranker_1": reranker_1,
    "reranker_2": reranker_2,
    "synthesizer": synthesizer,
    "join": join
})

p.add_link("input", "retriever_1", src_key="query_str")
p.add_link("input", "retriever_2", src_key="query_str")

p.add_link("retriever_1", "reranker_1", dest_key="nodes")
p.add_link("retriever_2", "reranker_2", dest_key="nodes")
p.add_link("input", "reranker_1", src_key="query_str", dest_key="query_str")
p.add_link("input", "reranker_2", src_key="query_str", dest_key="query_str")

p.add_link("reranker_1", "join", dest_key="reranker_1_nodes")
p.add_link("reranker_2", "join", dest_key="reranker_2_nodes")

p.add_link("join", "synthesizer", dest_key="nodes")
p.add_link("input", "synthesizer", src_key="query_str", dest_key="query_str")

async def run():
    output = await p.arun(query_str="my query")
    print(output)

if __name__ = "__main__":
    asyncio.run(run())

Relevant Logs/Tracbacks

No response

dosubot[bot] commented 3 months ago

To ensure that your query pipeline is optimized for async/parallel execution using pipeline.arun, you need to verify a few key aspects:

  1. Correct Setup of Asynchronous Methods: Ensure that all components in your pipeline support asynchronous operations. For example, the RankGPTRerank component should use asynchronous methods like apostprocess_nodes.

  2. Proper Use of arun Method: The arun method should be correctly implemented to handle asynchronous tasks. It uses asyncio.gather to run tasks concurrently.

  3. Debugging and Tracing: Utilize the provided logging and debugging tools to trace the execution of asynchronous tasks.

Here is a refined version of your setup with these considerations:

import asyncio
from llama_index.core.query_pipeline import QueryPipeline, InputComponent, ArgPackComponent
from llama_index.llms.openai import OpenAI
from llama_index.core.response_synthesizers import get_response_synthesizer
from llama_index.core.postprocessor.rankGPT_rerank import RankGPTRerank

# Initialize components
retriever_1 = index_1.as_retriever()
retriever_2 = index_2.as_retriever()
synthesizer = get_response_synthesizer()
reranker_1 = RankGPTRerank(llm=OpenAI(model='gpt-3.5-turbo'), top_n=3)
reranker_2 = RankGPTRerank(llm=OpenAI(model='gpt-3.5-turbo'), top_n=3)
join = ArgPackComponent()

# Construct query pipeline
p = QueryPipeline(verbose=True)
p.add_modules({
    "input": InputComponent(),
    "retriever_1": retriever_1,
    "retriever_2": retriever_2,
    "reranker_1": reranker_1,
    "reranker_2": reranker_2,
    "synthesizer": synthesizer,
    "join": join
})

# Add links
p.add_link("input", "retriever_1", src_key="query_str")
p.add_link("input", "retriever_2", src_key="query_str")
p.add_link("retriever_1", "reranker_1", dest_key="nodes")
p.add_link("retriever_2", "reranker_2", dest_key="nodes")
p.add_link("input", "reranker_1", src_key="query_str", dest_key="query_str")
p.add_link("input", "reranker_2", src_key="query_str", dest_key="query_str")
p.add_link("reranker_1", "join", dest_key="reranker_1_nodes")
p.add_link("reranker_2", "join", dest_key="reranker_2_nodes")
p.add_link("join", "synthesizer", dest_key="nodes")
p.add_link("input", "synthesizer", src_key="query_str", dest_key="query_str")

# Run the query asynchronously
async def run():
    output = await p.arun(query_str="my query")
    print(output)

if __name__ == "__main__":
    asyncio.run(run())

Debugging and Tracing

Enable debug logging to trace the execution:

import logging
import sys

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

Known Limitations

By following these steps, you should be able to achieve parallel execution in your query pipeline [1][2][3].

To continue talking to Dosu, mention @dosu.

ootkin commented 3 months ago

@dosu I'm trying to find apostprocess_nodes in the llama_index documentation, but i cannot find anything

dosubot[bot] commented 3 months ago

The RankGPTRerank component in the llama_index library does not have an apostprocess_nodes method. Instead, you can ensure your pipeline components are correctly set up for asynchronous execution by following the example provided in the LlamaIndex documentation for setting up a query pipeline with async/parallel execution.

Here is how you can modify your code snippet to ensure asynchronous execution:

import asyncio
from llama_index.core.query_pipeline import QueryPipeline, InputComponent, ArgPackComponent
from llama_index.llms.openai import OpenAI
from llama_index.core.response_synthesizers import get_response_synthesizer
from llama_index.core.postprocessor.rankGPT_rerank import RankGPTRerank

# Initialize components
retriever_1 = index_1.as_retriever()
retriever_2 = index_2.as_retriever()
synthesizer = get_response_synthesizer()
reranker_1 = RankGPTRerank(llm=OpenAI(model='gpt-3.5-turbo'), top_n=3)
reranker_2 = RankGPTRerank(llm=OpenAI(model='gpt-3.5-turbo'), top_n=3)
join = ArgPackComponent()

# Construct query pipeline
p = QueryPipeline(verbose=True)
p.add_modules({
    "input": InputComponent(),
    "retriever_1": retriever_1,
    "retriever_2": retriever_2,
    "reranker_1": reranker_1,
    "reranker_2": reranker_2,
    "synthesizer": synthesizer,
    "join": join
})

# Add links
p.add_link("input", "retriever_1", src_key="query_str")
p.add_link("input", "retriever_2", src_key="query_str")
p.add_link("retriever_1", "reranker_1", dest_key="nodes")
p.add_link("retriever_2", "reranker_2", dest_key="nodes")
p.add_link("input", "reranker_1", src_key="query_str", dest_key="query_str")
p.add_link("input", "reranker_2", src_key="query_str", dest_key="query_str")
p.add_link("reranker_1", "join", dest_key="reranker_1_nodes")
p.add_link("reranker_2", "join", dest_key="reranker_2_nodes")
p.add_link("join", "synthesizer", dest_key="nodes")
p.add_link("input", "synthesizer", src_key="query_str", dest_key="query_str")

# Run the query asynchronously
async def run():
    output = await p.arun(query_str="my query")
    print(output)

if __name__ == "__main__":
    asyncio.run(run())

This setup ensures that your query pipeline is correctly configured for asynchronous execution, similar to the example provided in the LlamaIndex documentation [1].

To continue talking to Dosu, mention @dosu.