run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.22k stars 4.64k forks source link

[Question]: Implementing randomized Azure resources for indexing with VectorStoreIndex in asynchronous mode #13980

Open marcodgiudice opened 3 weeks ago

marcodgiudice commented 3 weeks ago

Question Validation

Question

I am currently facing an issue with rate limits while indexing documents using VectorStoreIndex in asynchronous mode with 4 workers. The process involves using the text-embedding-ada-002 model deployed on Azure. Due to hitting the rate limit of a single Azure OpenAI resource, I would like to distribute the requests across multiple Azure OpenAI resources where this model is deployed.

Objective: To avoid rate limiting, I want to dynamically select a different Azure resource for each of the asynchronous jobs that create the index, rather than using a single fixed resource.

Current Method: I already have a method set_openai_resource which sets the Azure resource for the embedding model:

def set_openai_resource(self, allowed_resources_list):
    [...]
    self.embed_model = AzureOpenAIEmbedding(
        model=self.azure_model_embedding,
        azure_endpoint=api_base,
        azure_deployment=self.azure_deployment_embedding,
        api_key=self.openai.api_key,
        api_base=self.openai.api_base,
        api_type=self.openai.api_type,
        api_version=self.openai.api_version,
        max_retries=0,
        embed_batch_size=1,
        num_workers=4
    )

Indexing Initialization:

index = VectorStoreIndex(nodes=nodes, storage_context=self.storage_context, use_async=True)

Question: How can I modify my setup to randomly select one of the available Azure resources for each asynchronous job to balance the load and avoid hitting the rate limits?

Any guidance or examples on how to achieve this would be greatly appreciated.

Thank you!

dosubot[bot] commented 3 weeks ago

Hey there, @marcodgiudice! I'm here to help you with any bugs, questions, or contributions you might have. Let's tackle this together!

To modify your setup to randomly select one of the available Azure resources for each asynchronous job, you can implement a function that randomly picks an Azure resource from a list of available resources. You can then use this function to set the azure_endpoint and azure_deployment parameters for each job.

Here's an example of how you can achieve this:

  1. Define a list of available Azure resources.
  2. Implement a function to randomly select a resource.
  3. Use the selected resource to configure the AzureOpenAIEmbedding instance.

Here's a code snippet to illustrate this:

import random
from typing import List

# List of available Azure resources
azure_resources = [
    {
        "azure_endpoint": "https://resource1.openai.azure.com/",
        "azure_deployment": "deployment1"
    },
    {
        "azure_endpoint": "https://resource2.openai.azure.com/",
        "azure_deployment": "deployment2"
    },
    # Add more resources as needed
]

def get_random_azure_resource() -> dict:
    return random.choice(azure_resources)

# Example usage
selected_resource = get_random_azure_resource()

azure_openai_instance = AzureOpenAIEmbedding(
    model="text-embedding-ada-002",
    azure_endpoint=selected_resource["azure_endpoint"],
    azure_deployment=selected_resource["azure_deployment"],
    api_key="your-api-key",
    api_base="your-api-base",
    api_type="your-api-type",
    api_version="your-api-version",
    max_retries=0,
    embed_batch_size=1,
    num_workers=4
)

To integrate this into the VectorStoreIndex class, you need to modify the _async_add_nodes_to_index method. This is where the asynchronous addition of nodes to the vector store happens. You can introduce your logic for random selection of Azure resources within this method.

Here is a snippet of the _async_add_nodes_to_index method where you can integrate the random selection of Azure resources:

import random
import pymongo
from llama_index.vector_stores.azurecosmosmongo import AzureCosmosDBMongoDBVectorSearch

async def _async_add_nodes_to_index(
        self,
        index_struct: IndexDict,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **insert_kwargs: Any,
    ) -> None:
        """Asynchronously add nodes to index."""
        if not nodes:
            return

        # Example: List of Azure resources
        azure_resources = [
            "YOUR_AZURE_COSMOSDB_MONGODB_URI_1",
            "YOUR_AZURE_COSMOSDB_MONGODB_URI_2",
            "YOUR_AZURE_COSMOSDB_MONGODB_URI_3"
        ]

        for nodes_batch in iter_batch(nodes, self._insert_batch_size):
            nodes_batch = await self._aget_node_with_embedding(
                nodes_batch, show_progress
            )

            # Randomly select an Azure resource
            selected_resource = random.choice(azure_resources)
            mongodb_client = pymongo.MongoClient(selected_resource)
            vector_store = AzureCosmosDBMongoDBVectorSearch(
                mongodb_client=mongodb_client,
                db_name="demo_vectordb",
                collection_name="paul_graham_essay",
            )
            insert_kwargs['vector_store'] = vector_store

            new_ids = await vector_store.async_add(nodes_batch, **insert_kwargs)

            # if the vector store doesn't store text, we need to add the nodes to the
            # index struct and document store
            if not vector_store.stores_text or self._store_nodes_override:
                for node, new_id in zip(nodes_batch, new_ids):
                    # NOTE: remove embedding from node to avoid duplication
                    node_without_embedding = node.copy()
                    node_without_embedding.embedding = None

                    index_struct.add_node(node_without_embedding, text_id=new_id)
                    self._docstore.add_documents(
                        [node_without_embedding], allow_update=True
                    )
            else:
                # NOTE: if the vector store keeps text,
                # we only need to add image and index nodes
                for node, new_id in zip(nodes_batch, new_ids):
                    if isinstance(node, (ImageNode, IndexNode)):
                        # NOTE: remove embedding from node to avoid duplication
                        node_without_embedding = node.copy()
                        node_without_embedding.embedding = None

                        index_struct.add_node(node_without_embedding, text_id=new_id)
                        self._docstore.add_documents(
                            [node_without_embedding], allow_update=True
                        )

In this example, azure_resources is a list of Azure CosmosDB MongoDB URIs, and selected_resource is randomly chosen from this list. The selected resource is then used to create a MongoClient and an instance of AzureCosmosDBMongoDBVectorSearch, which is added to the insert_kwargs dictionary. This allows the asynchronous addition of nodes to the randomly selected Azure resource [1][2][3].

To continue talking to Dosu, mention @dosu.

marcodgiudice commented 3 weeks ago

@dosu is there a way of doing this without extending the llamindex library?

dosubot[bot] commented 3 weeks ago

Yes, you can achieve dynamic selection of Azure resources for each asynchronous job without extending the llamindex library by leveraging the existing functionality and modifying your setup.

Here's how you can do it:

  1. Define a list of available Azure resources.
  2. Implement a function to randomly select a resource.
  3. Use the selected resource to configure the AzureOpenAIEmbedding instance for each job.

Here's a code snippet to illustrate this:

import random
from typing import List

# List of available Azure resources
azure_resources = [
    {
        "azure_endpoint": "https://resource1.openai.azure.com/",
        "azure_deployment": "deployment1"
    },
    {
        "azure_endpoint": "https://resource2.openai.azure.com/",
        "azure_deployment": "deployment2"
    },
    # Add more resources as needed
]

def get_random_azure_resource() -> dict:
    return random.choice(azure_resources)

# Example usage
selected_resource = get_random_azure_resource()

azure_openai_instance = AzureOpenAIEmbedding(
    model="text-embedding-ada-002",
    azure_endpoint=selected_resource["azure_endpoint"],
    azure_deployment=selected_resource["azure_deployment"],
    api_key="your-api-key",
    api_base="your-api-base",
    api_type="your-api-type",
    api_version="your-api-version",
    max_retries=0,
    embed_batch_size=1,
    num_workers=4
)

To integrate this into the VectorStoreIndex class, you can modify the _async_add_nodes_to_index method to include the random selection of Azure resources. Here is an example:

import random
from typing import Sequence, Any
from llama_index.core.indices.vector_store.base import VectorStoreIndex
from llama_index.core.indices.vector_store.base import IndexDict, BaseNode
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding

class CustomVectorStoreIndex(VectorStoreIndex):
    async def _async_add_nodes_to_index(
        self,
        index_struct: IndexDict,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **insert_kwargs: Any,
    ) -> None:
        """Asynchronously add nodes to index."""
        if not nodes:
            return

        for nodes_batch in iter_batch(nodes, self._insert_batch_size):
            nodes_batch = await self._aget_node_with_embedding(
                nodes_batch, show_progress
            )

            # Randomly select an Azure resource
            selected_resource = get_random_azure_resource()
            azure_openai_instance = AzureOpenAIEmbedding(
                model="text-embedding-ada-002",
                azure_endpoint=selected_resource["azure_endpoint"],
                azure_deployment=selected_resource["azure_deployment"],
                api_key="your-api-key",
                api_base="your-api-base",
                api_type="your-api-type",
                api_version="your-api-version",
                max_retries=0,
                embed_batch_size=1,
                num_workers=4
            )
            insert_kwargs['embed_model'] = azure_openai_instance

            new_ids = await self._vector_store.async_add(nodes_batch, **insert_kwargs)

            # if the vector store doesn't store text, we need to add the nodes to the
            # index struct and document store
            if not self._vector_store.stores_text or self._store_nodes_override:
                for node, new_id in zip(nodes_batch, new_ids):
                    # NOTE: remove embedding from node to avoid duplication
                    node_without_embedding = node.copy()
                    node_without_embedding.embedding = None
                    index_struct.add_node(node_without_embedding, text_id=new_id)
                    self._docstore.add_documents(
                        [node_without_embedding], allow_update=True
                    )
            else:
                # NOTE: if the vector store keeps text,
                # we only need to add image and index nodes
                for node, new_id in zip(nodes_batch, new_ids):
                    if isinstance(node, (ImageNode, IndexNode)):
                        # NOTE: remove embedding from node to avoid duplication
                        node_without_embedding = node.copy()
                        node_without_embedding.embedding = None

                        index_struct.add_node(node_without_embedding, text_id=new_id)
                        self._docstore.add_documents(
                            [node_without_embedding], allow_update=True
                        )

This approach allows you to dynamically select and use different Azure resources for each asynchronous job without extending the llamindex library [1][2].

To continue talking to Dosu, mention @dosu.