run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.9k stars 5.28k forks source link

[Question]: How do I prevent vectors from OpenSearch to be included in the payload during querying? #17003

Open dblilienthal opened 2 days ago

dblilienthal commented 2 days ago

Question Validation

Question

After doing some analysis using the LlamaDebugHandler, I noticed that all the vectors + metadata is sent as context to the llm when using query engine. This causes unnecessary noise and additional charges to our account which can degrade the performance of our applications. How do I restrict the payload to only send the document text?

from llama_index.core.callbacks import (
    CallbackManager,
    LlamaDebugHandler,
    CBEventType,
)
from llama_index.core import Settings
llama_debug = LlamaDebugHandler(print_trace_on_end=True)
callback_manager = CallbackManager([llama_debug])
Settings.callback_manager = callback_manager

os_kwargs = dict(
    endpoint=OPENSEARCH_URL,
    index=COLLECTION_NAME,
    dim=EMBEDDING_DIM,
    embedding_field="vector_field",
    text_field="page_text",
    http_auth=awsauth, 
    use_ssl=True, 
    verify_certs=True, 
    connection_class=RequestsHttpConnection, 
    engine="faiss"
)

os_vector_client = OpensearchVectorClient(**os_kwargs)
vector_store = OpensearchVectorStore(os_vector_client)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_vector_store(vector_store, storage_context=storage_context)

as_query_engine_kwargs = {
    "response_mode": "tree_summarize",
    "llm": Settings.llm,
    "similarity_top_k": 2,
}

retriever = index.as_query_engine(**as_query_engine_kwargs)
response = retriever.query("<some general query>")
print(response)
.......

event_pairs = llama_debug.get_llm_inputs_outputs()
print(event_pairs[0][1].payload['messages'][1])

With the output as

user: Context information from multiple sources is below.
---------------------
metadata: {'project_metadata': {'...}, 'document_metadata': {...}}
page_text:  ...

vector_field: [0.002726590959355235, ..., -0.006757325027137995,
---------------------
Given the information from multiple sources and not prior knowledge, answer the query.
Query: <query question>
Answer: 
dosubot[bot] commented 2 days ago

Hey @dblilienthal, I'm here to help you with any bugs, questions, or to guide you on becoming a contributor. Let me know how I can assist you! 😊

To prevent vectors from being included in the payload during querying in OpenSearch, you need to modify the _gen_query_request method to set include_vector to False. This will ensure that only the document text is sent in the query request. Here's how you can do it:

def _gen_query_request(self, query: VectorStoreQuery) -> models.QueryRequest:
    """
    Generate the OpenSearch query request.

    Args:
        query (VectorStoreQuery): The vector store query

    Return:
        OpenSearch query request
    """
    filter = _to_ha3_engine_filter(query.filters)
    request = models.QueryRequest(
        table_name=self._config.table_name,
        namespace=self._config.namespace,
        vector=query.query_embedding,
        top_k=query.similarity_top_k,
        filter=filter,
        include_vector=False,  # Set this to False to exclude vectors
        output_fields=self._config.output_fields,
    )

    if self._config.search_config:
        request.order = self._config.search_config.get("order", "ASC")
        score_threshold: float = self._config.search_config.get(
            "score_threshold", None
        )
        if score_threshold is not None:
            request.score_threshold = score_threshold
        search_params = self._config.search_config.get("search_params", None)
        if search_params is not None:
            request.search_params = json.dumps(search_params)
    return request

By setting include_vector=False, the vectors will not be included in the query payload, thus reducing unnecessary data being sent and potentially improving performance and reducing costs [1].

To continue talking to Dosu, mention @dosu.

logan-markewich commented 2 days ago

@dblilienthal Did you put the embedding into the node metadata? I'm unable to reproduce this with default settings

For example:

docker run -d -p 9200:9200 -p 9600:9600 -e "discovery.type=single-node" -e "OPENSEARCH_INITIAL_ADMIN_PASSWORD=<some password>" -e "plugins.security.disabled=true" opensearchproject/opensearch:latest

And then

from os import getenv
from llama_index.core import SimpleDirectoryReader
from llama_index.vector_stores.opensearch import (
    OpensearchVectorStore,
    OpensearchVectorClient,
)
from llama_index.core import VectorStoreIndex, StorageContext

# http endpoint for your cluster (opensearch required for vector index usage)
endpoint = getenv("OPENSEARCH_ENDPOINT", "http://127.0.0.1:9200")

# index to demonstrate the VectorStore impl
idx = getenv("OPENSEARCH_INDEX", "llama-index-demo")

# load some sample data
documents = SimpleDirectoryReader("./data/paul_graham/").load_data()

# OpensearchVectorClient stores text in this field by default
text_field = "content"
# OpensearchVectorClient stores embeddings in this field by default
embedding_field = "embedding"
# OpensearchVectorClient encapsulates logic for a
# single opensearch index with vector search enabled
client = OpensearchVectorClient(
    endpoint, idx, 1536, embedding_field=embedding_field, text_field=text_field
)
# initialize vector store
vector_store = OpensearchVectorStore(client)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

# initialize an index using our sample data and the client we just created
index = VectorStoreIndex.from_documents(
    documents=documents, storage_context=storage_context
)

# print the node format that the llm will get
nodes = index.as_retriever().retrieve("What did the author do growing up?")
print(nodes[0].node.get_content(metadata_mode="llm"))
dblilienthal commented 2 days ago

@logan-markewich I'm not entirely sure how to tell if the embedding is in the node metadata. I did however figure out how to work around it. I overwrote some functions in the OpensearchVectorClient and OpensearchVectorStore to allow us to manually specify which sources to return from the opensearch query

class CustomOpensearchVectorClient(OpensearchVectorClient):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def _default_scoring_script_query(
        self,
        query_vector: List[float],
        k: int = 4,
        space_type: str = "l2Squared",
        pre_filter: Optional[Union[Dict, List]] = None,
        vector_field: str = "embedding",
        source_include: Optional[List] = None,
    ) -> Dict:
        """For Scoring Script Search, this is the default query. Has to account for Opensearch Service
        Serverless which does not support painless scripting functions so defaults to knn_score.
        """
        if not pre_filter:
            pre_filter = MATCH_ALL_QUERY

        # check if we can use painless scripting or have to use default knn_score script
        if self.is_aoss:
            if space_type == "l2Squared":
                raise ValueError(
                    "Unsupported space type for aoss. Can only use l1, l2, cosinesimil."
                )
            script = self._get_knn_scoring_script(
                space_type, vector_field, query_vector
            )
        else:
            script = self._get_painless_scoring_script(
                space_type, vector_field, query_vector
            )
        response = {
            "size": k,
            "query": {
                "script_score": {
                    "query": pre_filter,
                    "script": script,
                }
            },
        }
        if source_include:
            response["_source"] = {"includes": source_include}
        return response 

    def _default_approximate_search_query(
        self,
        query_vector: List[float],
        k: int = 4,
        vector_field: str = "embedding",
        source_include: Optional[List] = None,
    ) -> Dict:
        """For Approximate k-NN Search, this is the default query."""
        result = {
            "size": k,
            "query": {"knn": {vector_field: {"vector": query_vector, "k": k}}},
        }
        if source_include:
            result["_source"] = {"includes": source_include}
        return result

    def _knn_search_query(
        self,
        embedding_field: str,
        query_embedding: List[float],
        k: int,
        filters: Optional[MetadataFilters] = None,
        source_include: Optional[List[str]] = None,
    ) -> Dict:
        """
        Do knn search.

        If there are no filters do approx-knn search.
        If there are (pre)-filters, do an exhaustive exact knn search using 'painless
            scripting' if the version of Opensearch supports it, otherwise uses knn_score scripting score.

        Note:
            -AWS Opensearch Serverless does not support the painless scripting functionality at this time according to AWS.
            -Also note that approximate knn search does not support pre-filtering.

        Args:
            query_embedding: Vector embedding to query.
            k: Maximum number of results.
            filters: Optional filters to apply before the search.
                Supports filter-context queries documented at
                https://opensearch.org/docs/latest/query-dsl/query-filter-context/
            source_include: Optional list of fields to include in the response.

        Returns:
            Up to k docs closest to query_embedding
        """
        pre_filter = self._parse_filters(filters)
        if not pre_filter:
            search_query = self._default_approximate_search_query(
                query_embedding, k, vector_field=embedding_field, source_include=source_include
            )
        elif self.is_aoss:
            # if is_aoss is set we are using Opensearch Serverless AWS offering which cannot use
            # painless scripting so default scoring script returned will be just normal knn_score script
            search_query = self._default_scoring_script_query(
                query_embedding,
                k,
                space_type=self.space_type,
                pre_filter={"bool": {"filter": pre_filter}},
                vector_field=embedding_field,
                source_include=source_include,
            )
        else:
            # https://opensearch.org/docs/latest/search-plugins/knn/painless-functions/
            search_query = self._default_scoring_script_query(
                query_embedding,
                k,
                space_type="l2Squared",
                pre_filter={"bool": {"filter": pre_filter}},
                vector_field=embedding_field,
                source_include=source_include,
            )
        return search_query

    def query(
        self,
        query_mode: VectorStoreQueryMode,
        query_str: Optional[str],
        query_embedding: List[float],
        k: int,
        filters: Optional[MetadataFilters] = None,
        source_include: Optional[List[str]] = None,
    ) -> VectorStoreQueryResult:
        if query_mode == VectorStoreQueryMode.HYBRID:
            if query_str is None or self._search_pipeline is None:
                raise ValueError(INVALID_HYBRID_QUERY_ERROR)
            search_query = self._hybrid_search_query(
                self._text_field,
                query_str,
                self._embedding_field,
                query_embedding,
                k,
                filters=filters,
            )
            params = {
                "search_pipeline": self._search_pipeline,
            }
        elif query_mode == VectorStoreQueryMode.TEXT_SEARCH:
            search_query = self._lexical_search_query(
                self._text_field, query_str, k, filters=filters
            )
            params = None
        else:
            search_query = self._knn_search_query(
                self._embedding_field, query_embedding, k, filters=filters, source_include=source_include
            )
            params = None

        res = self._os_client.search(
            index=self._index, body=search_query, params=params
        )

        return self._to_query_result(res)

    async def aquery(
        self,
        query_mode: VectorStoreQueryMode,
        query_str: Optional[str],
        query_embedding: List[float],
        k: int,
        filters: Optional[MetadataFilters] = None,
        source_include: Optional[List[str]] = None,
    ) -> VectorStoreQueryResult:
        if query_mode == VectorStoreQueryMode.HYBRID:
            if query_str is None or self._search_pipeline is None:
                raise ValueError(INVALID_HYBRID_QUERY_ERROR)
            search_query = self._hybrid_search_query(
                self._text_field,
                query_str,
                self._embedding_field,
                query_embedding,
                k,
                filters=filters,
            )
            params = {
                "search_pipeline": self._search_pipeline,
            }
        elif query_mode == VectorStoreQueryMode.TEXT_SEARCH:
            search_query = self._lexical_search_query(
                self._text_field, query_str, k, filters=filters
            )
            params = None
        else:
            search_query = self._knn_search_query(
                self._embedding_field, query_embedding, k, filters=filters, source_include=source_include
            )
            params = None

        res = await self._os_async_client.search(
            index=self._index, body=search_query, params=params
        )

        return self._to_query_result(res)

class CustomOpensearchVectorStore(OpensearchVectorStore):
    """This custom class adds the options to include source fields in the response to limit the payload size of the response."""
    source_include: Optional[List[str]] = None

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
        """
        Query index for top k most similar nodes.

        Args:
            query (VectorStoreQuery): Store query object.

        """
        query_embedding = cast(List[float], query.query_embedding)

        return self._client.query(
            query.mode,
            query.query_str,
            query_embedding,
            query.similarity_top_k,
            filters=query.filters,
            source_include=self.source_include
        )

    async def aquery(
        self, query: VectorStoreQuery, **kwargs: Any
    ) -> VectorStoreQueryResult:
        """
        Async query index for top k most similar nodes.

        Args:
            query (VectorStoreQuery): Store query object.

        """
        query_embedding = cast(List[float], query.query_embedding)

        return await self._client.aquery(
            query.mode,
            query.query_str,
            query_embedding,
            query.similarity_top_k,
            filters=query.filters,
            source_include=self.source_include
        )

os_vector_client = CustomOpensearchVectorClient(**os_kwargs)
vector_store = CustomOpensearchVectorStore(os_vector_client)
vector_store.source_include = ["page_text"]

storage_context = StorageContext.from_defaults(vector_store=vector_store)
index =  VectorStoreIndex.from_vector_store(vector_store, storage_context=storage_context)
logan-markewich commented 2 days ago

tbh I'm a little lost at what you changed (I guess the source_include thing?). Its also weird that I wasn't able to reproduce.

Are you using an opensearch db that was populated without using llama-index? Or how did you initially create the index?

When using llama-index to populate the db, it just serializes the node into the db, so the embedding field should not be included (as evidenced by my own code above)