run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
35.84k stars 5.08k forks source link

[Question]: Response Synthesizer #16228

Open navaneethstabilix opened 6 days ago

navaneethstabilix commented 6 days ago

Question Validation

Question

How to use response synthesizer when mutiple queyr engines are used consider if there is a neo4j db and sql db . depends on question respone must be generated should i use response synthesizer or router

dosubot[bot] commented 6 days ago

To handle multiple query engines like Neo4j and SQL, you should use the RouterQueryEngine to select the appropriate query engine based on the query. The RouterQueryEngine will choose the most suitable query engine and then use the Response Synthesizer to combine the responses if multiple query engines are selected.

Here is a summary of how to implement this:

  1. Define Events: Create events for selecting query engines and synthesizing responses.
  2. Workflow Steps:
    • Selector: Selects the appropriate query engine(s) based on the query.
    • Generate Responses: Uses the selected query engines to generate responses.
    • Synthesize Responses: Combines the responses from multiple query engines if necessary.

Here is the relevant code:

from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)

from llama_index.llms.openai import OpenAI
from llama_index.core.selectors.utils import get_selector_from_llm
from llama_index.core.base.response.schema import (
    PydanticResponse,
    Response,
    AsyncStreamingResponse,
)
from llama_index.core.bridge.pydantic import BaseModel
from llama_index.core.response_synthesizers import TreeSummarize
from llama_index.core.schema import QueryBundle
from llama_index.core import Settings

class RouterQueryEngineWorkflow(Workflow):
    @step
    async def selector(
        self, ctx: Context, ev: StartEvent
    ) -> QueryEngineSelectionEvent:
        """
        Selects a single/ multiple query engines based on the query.
        """
        await ctx.set("query", ev.get("query"))
        await ctx.set("llm", ev.get("llm"))
        await ctx.set("query_engine_tools", ev.get("query_engine_tools"))
        await ctx.set("summarizer", ev.get("summarizer"))

        llm = Settings.llm
        select_multiple_query_engines = ev.get("select_multi")
        query = ev.get("query")
        query_engine_tools = ev.get("query_engine_tools")
        selector = get_selector_from_llm(
            llm, is_multi=select_multiple_query_engines
        )

        query_engines_metadata = [
            query_engine.metadata for query_engine in query_engine_tools
        ]

        selected_query_engines = await selector.aselect(
            query_engines_metadata, query
        )

        return QueryEngineSelectionEvent(
            selected_query_engines=selected_query_engines
        )

    @step
    async def generate_responses(
        self, ctx: Context, ev: QueryEngineSelectionEvent
    ) -> SynthesizeEvent:
        """Generate the responses from the selected query engines."""
        query = await ctx.get("query", default=None)
        selected_query_engines = ev.selected_query_engines
        query_engine_tools = await ctx.get("query_engine_tools")

        query_engines = [engine.query_engine for engine in query_engine_tools]

        if len(selected_query_engines.selections) > 1:
            tasks = []
            for selected_query_engine in selected_query_engines.selections:
                query_engine = query_engines[selected_query_engine.index]
                tasks.append(query_engine.aquery(query))

            response_generated = await asyncio.gather(*tasks)
        else:
            query_engine = query_engines[
                selected_query_engines.selections[0].index
            ]
            response_generated = [await query_engine.aquery(query)]

        return SynthesizeEvent(
            result=response_generated,
            selected_query_engines=selected_query_engines,
        )

    async def acombine_responses(
        self,
        summarizer: TreeSummarize,
        responses: List[RESPONSE_TYPE],
        query_bundle: QueryBundle,
    ) -> RESPONSE_TYPE:
        """Async combine multiple response from sub-engines."""
        response_strs = []
        source_nodes = []
        for response in responses:
            if isinstance(
                response, (AsyncStreamingResponse, PydanticResponse)
            ):
                response_obj = await response.aget_response()
            else:
                response_obj = response
            source_nodes.extend(response_obj.source_nodes)
            response_strs.append(str(response))

        summary = await summarizer.aget_response(
            query_bundle.query_str, response_strs
        )

        if isinstance(summary, str):
            return Response(response=summary, source_nodes=source_nodes)
        elif isinstance(summary, BaseModel):
            return PydanticResponse(
                response=summary, source_nodes=source_nodes
            )
        else:
            return AsyncStreamingResponse(
                response_gen=summary, source_nodes=source_nodes
            )

    @step
    async def synthesize_responses(
        self, ctx: Context, ev: SynthesizeEvent
    ) -> StopEvent:
        """Synthesizes the responses from the generated responses."""
        response_generated = ev.result
        query = await ctx.get("query", default=None)
        summarizer = await ctx.get("summarizer")
        selected_query_engines = ev.selected_query_engines

        if len(response_generated) > 1:
            response = await self.acombine_responses(
                summarizer, response_generated, QueryBundle(query_str=query)
            )
        else:
            response = response_generated[0]

        response.metadata = response.metadata or {}
        response.metadata["selector_result"] = selected_query_engines

        return StopEvent(result=response)

Alternatively, for complex queries, you can use the SubQuestionQueryEngine to break down the query into sub-questions and synthesize the responses.

Example:

from llama_index.legacy.question_gen.openai_generator import OpenAIQuestionGenerator
from llama_index.legacy.response_synthesizers import get_response_synthesizer
from llama_index.legacy.service_context import ServiceContext
from llama_index.legacy.tools.query_engine import QueryEngineTool
from llama_index.legacy.query_engine.sub_question_query_engine import SubQuestionQueryEngine

# Initialize the necessary components
service_context = ServiceContext.from_defaults()
question_gen = OpenAIQuestionGenerator.from_defaults(llm=service_context.llm)
response_synthesizer = get_response_synthesizer(service_context=service_context)
query_engine_tools = [QueryEngineTool(metadata=metadata, query_engine=query_engine) for metadata, query_engine in your_query_engines]

# Create the SubQuestionQueryEngine
sub_question_query_engine = SubQuestionQueryEngine(
    question_gen=question_gen,
    response_synthesizer=response_synthesizer,
    query_engine_tools=query_engine_tools,
    verbose=True,
    use_async=True
)

# Use the engine to process a query
response = sub_question_query_engine.query(QueryBundle(query_str="Your complex query"))

In summary, use the RouterQueryEngine to select the appropriate query engine(s) and the Response Synthesizer to combine responses if multiple query engines are selected. For complex queries, consider using the SubQuestionQueryEngine [1][2][3].

To continue talking to Dosu, mention @dosu.