Open ihgumilar opened 1 month ago
Yes, you can process the document summary index concurrently using async to reduce the processing time. The provided code includes an asynchronous method abuild_index_from_nodes
that uses asyncio.gather
to run tasks concurrently.
Here is the relevant part of the code that demonstrates this:
async def abuild_index_from_nodes(
self,
index_graph: IndexGraph,
cur_node_ids: Dict[int, str],
all_node_ids: Dict[int, str],
level: int = 0,
) -> IndexGraph:
"""Consolidates chunks recursively, in a bottoms-up fashion."""
if len(cur_node_ids) <= self.num_children:
index_graph.root_nodes = cur_node_ids
return index_graph
indices, cur_nodes_chunks, text_chunks = self._prepare_node_and_text_chunks(
cur_node_ids
)
with self._service_context.callback_manager.event(
CBEventType.TREE, payload={EventPayload.CHUNKS: text_chunks}
) as event:
text_chunks_progress = get_tqdm_iterable(
text_chunks,
show_progress=self._show_progress,
desc="Generating summaries",
)
tasks = [
self._service_context.llm.apredict(
self.summary_prompt, context_str=text_chunk
)
for text_chunk in text_chunks_progress
]
outputs: List[Tuple[str, str]] = await asyncio.gather(*tasks)
summaries = [output[0] for output in outputs]
self._service_context.llama_logger.add_log(
{"summaries": summaries, "level": level}
)
event.on_end(payload={"summaries": summaries, "level": level})
new_node_dict = self._construct_parent_nodes(
index_graph, indices, cur_nodes_chunks, summaries
)
all_node_ids.update(new_node_dict)
index_graph.root_nodes = new_node_dict
if len(new_node_dict) <= self.num_children:
return index_graph
else:
return await self.abuild_index_from_nodes(
index_graph, new_node_dict, all_node_ids, level=level + 1
)
This method uses asyncio.gather
to run the summary generation tasks concurrently, which can help reduce the overall processing time [1].
To continue the conversation, mention @dosu.
I don't see any class that is using DocumentSummaryIndex
The DocumentSummaryIndex
class is defined in the llama_index.legacy.indices.document_summary.base
module. You can import it using the following statement:
from llama_index.legacy.indices.document_summary.base import DocumentSummaryIndex
If you are looking for an alternative approach, you can use the IndexDocumentSummary
class, which is defined in the llama_index.core.data_structs.document_summary
module. This class provides a data structure for managing document summaries and their associated nodes.
Here is a brief overview of the IndexDocumentSummary
class:
from dataclasses import dataclass, field
from typing import Dict, List
from llama_index.core.data_structs.data_structs import IndexStruct
from llama_index.core.data_structs.struct_type import IndexStructType
from llama_index.core.schema import BaseNode
@dataclass
class IndexDocumentSummary(IndexStruct):
"""A simple struct containing a mapping from summary node_id to doc node_ids.
Also mapping vice versa.
"""
summary_id_to_node_ids: Dict[str, List[str]] = field(default_factory=dict)
node_id_to_summary_id: Dict[str, str] = field(default_factory=dict)
doc_id_to_summary_id: Dict[str, str] = field(default_factory=dict)
def add_summary_and_nodes(
self,
summary_node: BaseNode,
nodes: List[BaseNode],
) -> str:
"""Add node and summary."""
summary_id = summary_node.node_id
ref_doc_id = summary_node.ref_doc_id
if ref_doc_id is None:
raise ValueError(
"ref_doc_id of node cannot be None when building a document "
"summary index"
)
self.doc_id_to_summary_id[ref_doc_id] = summary_id
for node in nodes:
node_id = node.node_id
if summary_id not in self.summary_id_to_node_ids:
self.summary_id_to_node_ids[summary_id] = []
self.summary_id_to_node_ids[summary_id].append(node_id)
self.node_id_to_summary_id[node_id] = summary_id
return summary_id
@property
def summary_ids(self) -> List[str]:
"""Get summary ids."""
return list(self.summary_id_to_node_ids.keys())
def delete(self, doc_id: str) -> None:
"""Delete a document and its nodes."""
summary_id = self.doc_id_to_summary_id[doc_id]
del self.doc_id_to_summary_id[doc_id]
node_ids = self.summary_id_to_node_ids[summary_id]
for node_id in node_ids:
del self.node_id_to_summary_id[node_id]
del self.summary_id_to_node_ids[summary_id]
def delete_nodes(self, node_ids: List[str]) -> None:
for node_id in node_ids:
summary_id = self.node_id_to_summary_id[node_id]
self.summary_id_to_node_ids[summary_id].remove(node_id)
del self.node_id_to_summary_id[node_id]
@classmethod
def get_type(cls) -> IndexStructType:
"""Get type."""
return IndexStructType.DOCUMENT_SUMMARY
This class allows you to create a summary for each document and manage the relationships between summaries and document nodes [1][2][3][4][5].
To continue the conversation, mention @dosu.
But how can we make documentsummaryindex concurrently and make it as query engine after that ?
To make the DocumentSummaryIndex
concurrently and then use it as a query engine, you can follow the example provided in the QASummaryQueryEngineBuilder
class. Here is a step-by-step guide based on the provided code:
Initialize the Builder: Create an instance of QASummaryQueryEngineBuilder
with the necessary contexts and texts.
Build from Documents: Use the build_from_documents
method to parse nodes, ingest them, and build the indices.
Create Query Engines: Convert the indices to query engines.
Return Router Query Engine: Combine the query engines into a RouterQueryEngine
.
Here is the code to achieve this:
from llama_index.core.schema import Document
from llama_index.core.service_context import ServiceContext
from llama_index.core.storage.storage_context import StorageContext
from llama_index.core.composability.joint_qa_summary import QASummaryQueryEngineBuilder
# Initialize the builder with optional custom contexts and texts
builder = QASummaryQueryEngineBuilder(
storage_context=StorageContext.from_defaults(),
service_context=ServiceContext.from_defaults(),
summary_text="Custom summary text",
qa_text="Custom QA text"
)
# Assume `documents` is a list of Document objects you want to index
documents = [...] # Replace with your actual documents
# Build the query engine from documents
query_engine = builder.build_from_documents(documents)
# Now `query_engine` can be used to perform queries
Additionally, you can use the DocumentSummaryIndexLLMRetriever
or DocumentSummaryIndexEmbeddingRetriever
classes to retrieve nodes from the DocumentSummaryIndex
. These classes provide methods to retrieve relevant summaries using either LLM calls or embeddings.
Here is an example of how to use DocumentSummaryIndexLLMRetriever
:
from llama_index.core.indices.document_summary.retrievers import DocumentSummaryIndexLLMRetriever
from llama_index.core.indices.document_summary.base import DocumentSummaryIndex
from llama_index.core.llms.llm import LLM
# Assume `index` is an instance of DocumentSummaryIndex
index = DocumentSummaryIndex(...)
# Initialize the retriever
retriever = DocumentSummaryIndexLLMRetriever(
index=index,
llm=LLM(...)
)
# Perform a query
query_bundle = QueryBundle(query_str="Your query here")
results = retriever._retrieve(query_bundle)
This code initializes the QASummaryQueryEngineBuilder
, builds the indices concurrently from the provided documents, and returns a RouterQueryEngine
that can be used for querying. Additionally, it demonstrates how to use the DocumentSummaryIndexLLMRetriever
to retrieve nodes from the DocumentSummaryIndex
[1][2][3].
To continue the conversation, mention @dosu.
@dosu is there a way to build this in async ?
query_engine = builder.build_from_documents(documents)
Yes, you can build the query_engine
asynchronously using the QASummaryQueryEngineBuilder
class. Here is an example of how to modify the build_from_documents
method to run asynchronously:
import asyncio
from llama_index.core.service_context import ServiceContext
from llama_index.core.storage.storage_context import StorageContext
from llama_index.core.composability.joint_qa_summary import QASummaryQueryEngineBuilder
async def async_build_from_documents(builder, documents):
return await builder.build_from_documents(documents)
# Usage
async def main():
builder = QASummaryQueryEngineBuilder(
storage_context=StorageContext.from_defaults(),
service_context=ServiceContext.from_defaults(),
summary_text="Custom summary text",
qa_text="Custom QA text"
)
documents = [...] # Your list of documents
query_engine = await async_build_from_documents(builder, documents)
print(query_engine)
# Run the async main function
asyncio.run(main())
In this example, the async_build_from_documents
function wraps the build_from_documents
method to run it asynchronously. The main
function initializes the QASummaryQueryEngineBuilder
, builds the query engine from the provided documents asynchronously, and prints the result. This approach leverages the asynchronous capabilities of the LlamaIndex framework to improve performance.
To continue the conversation, mention @dosu.
Again coming back to original question. How can I make the following code to be in async, particularly for building doc_summary_index ?
# default mode of building the index
response_synthesizer = get_response_synthesizer(
response_mode="tree_summarize", use_async=True
)
doc_summary_index = DocumentSummaryIndex.from_documents(
city_docs,
llm=chatgpt,
transformations=[splitter],
response_synthesizer=response_synthesizer,
show_progress=True,
)
To build doc_summary_index
asynchronously, you can modify the code to use asynchronous methods and utilities provided by the LlamaIndex framework. Here is an example of how you can achieve this:
Use run_async_tasks
for concurrent processing: This function allows you to run a list of coroutines concurrently.
Modify the DocumentSummaryIndex
class to support asynchronous building: Ensure that the methods used for building the index are asynchronous.
Here is the modified code:
import asyncio
from llama_index.legacy.async_utils import run_async_tasks
from llama_index.legacy.indices.document_summary.base import DocumentSummaryIndex
from llama_index.legacy.response_synthesizers import get_response_synthesizer
# Define an asynchronous function to build the document summary index
async def async_build_doc_summary_index(documents, llm, transformations, response_synthesizer, show_progress):
doc_summary_index = DocumentSummaryIndex(
response_synthesizer=response_synthesizer,
show_progress=show_progress
)
# Assuming `build_doc_summary` is the coroutine function to build a single doc summary
async def build_doc_summary(doc):
# Your logic to build a summary for a single document
pass
tasks = [build_doc_summary(doc) for doc in documents]
await run_async_tasks(tasks, show_progress=show_progress, workers=4, desc="Building doc summary index")
return doc_summary_index
# Usage
async def main():
response_synthesizer = get_response_synthesizer(
response_mode="tree_summarize", use_async=True
)
city_docs = [...] # Your list of documents
splitter = ... # Your splitter transformation
chatgpt = ... # Your LLM instance
doc_summary_index = await async_build_doc_summary_index(
city_docs,
llm=chatgpt,
transformations=[splitter],
response_synthesizer=response_synthesizer,
show_progress=True
)
print(doc_summary_index)
# Run the async main function
asyncio.run(main())
In this example:
async_build_doc_summary_index
is an asynchronous function that builds the document summary index.run_async_tasks
is used to run the summary building tasks concurrently.main
function initializes the necessary components and builds the document summary index asynchronously.This approach leverages the asynchronous capabilities of the LlamaIndex framework to improve performance when building the doc_summary_index
[1][2].
To continue the conversation, mention @dosu.
Question Validation
Question
Hi,
I am trying to build document summary index following this code
https://docs.llamaindex.ai/en/v0.10.17/examples/index_structs/doc_summary/DocSummary.html
However, it takes some time. Is there a way to process it concurrently using async?
Thanks IG