langchain-ai / langchain

🦜🔗 Build context-aware reasoning applications
https://python.langchain.com
MIT License
92.07k stars 14.67k forks source link

AzureSearch.avector_search_with_score() triggers "TypeError: 'AsyncSearchItemPaged' object is not iterable" when calling _results_to_documents() #24740

Open chrislrobert opened 1 month ago

chrislrobert commented 1 month ago

Checked other resources

Example Code

embeddings = AzureOpenAIEmbeddings(
    azure_endpoint=azure_endpoint,
    openai_api_version=openai_api_version,
    openai_api_key=openai_api_key,
    openai_api_type=openai_api_type,
    deployment=deployment,
    chunk_size=1)

vectorstore = AzureSearch(
    azure_search_endpoint=azure_search_endpoint,
    azure_search_key=azure_search_key,
    index_name=index_name,
    embedding_function=embeddings.embed_query,
)

system_message_prompt = SystemMessagePromptTemplate.from_template(
    system_prompt)
human_message_prompt = HumanMessagePromptTemplate.from_template(
    human_template)
chat_prompt = ChatPromptTemplate.from_messages(
    [system_message_prompt, human_message_prompt])

doc_chain = load_qa_chain(
    conversation_llm, chain_type="stuff", prompt=chat_prompt, callback_manager=default_manager
)

conversation_chain = ConversationalRetrievalChain(
    retriever=vectorstore.as_retriever(search_type="similarity_score_threshold", k=rag_top_k,
                                       search_kwargs={"score_threshold": rag_score_threshold}),
    combine_docs_chain=doc_chain,
    question_generator=question_generator,
    return_source_documents=True,
    callback_manager=default_manager,
    rephrase_question=False,
    memory=memory,
    max_tokens_limit=max_retrieval_tokens,
)

result = await conversation_chain.ainvoke({"question": question, "chat_history": chat_history}

Error Message and Stack Trace (if applicable)

TypeError("'AsyncSearchItemPaged' object is not iterable")Traceback (most recent call last):

File "/Users/crobert/Code/Higher Bar AI/almitra-pilot-be/venv/lib/python3.10/site-packages/langchain/chains/base.py", line 208, in ainvoke await self._acall(inputs, run_manager=run_manager)

File "/Users/crobert/Code/Higher Bar AI/almitra-pilot-be/venv/lib/python3.10/site-packages/langchain/chains/conversational_retrieval/base.py", line 212, in _acall docs = await self._aget_docs(new_question, inputs, run_manager=_run_manager)

File "/Users/crobert/Code/Higher Bar AI/almitra-pilot-be/venv/lib/python3.10/site-packages/langchain/chains/conversational_retrieval/base.py", line 410, in _aget_docs docs = await self.retriever.ainvoke(

File "/Users/crobert/Code/Higher Bar AI/almitra-pilot-be/venv/lib/python3.10/site-packages/langchain_core/retrievers.py", line 280, in ainvoke raise e

File "/Users/crobert/Code/Higher Bar AI/almitra-pilot-be/venv/lib/python3.10/site-packages/langchain_core/retrievers.py", line 273, in ainvoke result = await self._aget_relevant_documents(

File "/Users/crobert/Code/Higher Bar AI/almitra-pilot-be/venv/lib/python3.10/site-packages/langchain_community/vectorstores/azuresearch.py", line 1590, in _aget_relevant_documents await self.vectorstore.asimilarity_search_with_relevance_scores(

File "/Users/crobert/Code/Higher Bar AI/almitra-pilot-be/venv/lib/python3.10/site-packages/langchain_community/vectorstores/azuresearch.py", line 663, in asimilarity_search_with_relevance_scores result = await self.avector_search_with_score(query, k=k, **kwargs)

File "/Users/crobert/Code/Higher Bar AI/almitra-pilot-be/venv/lib/python3.10/site-packages/langchain_community/vectorstores/azuresearch.py", line 750, in avector_search_with_score return _results_to_documents(results)

File "/Users/crobert/Code/Higher Bar AI/almitra-pilot-be/venv/lib/python3.10/site-packages/langchain_community/vectorstores/azuresearch.py", line 1623, in _results_to_documents docs = [

TypeError: 'AsyncSearchItemPaged' object is not iterable

Description

This commit for issue #24064 caused a regression in async support. After that commit, avector_search_with_score() calls _asimple_search(), which uses async with self.async_client, and then tries to call _results_to_documents() with the results — but that triggers a "TypeError: 'AsyncSearchItemPaged' object is not iterable" because it uses AsyncSearchItemPaged on a closed HTTP connection (because the connection closed at the end of the _asimple_search() with block.

The original async PR #22075 seemed to have the right idea: the async results need to be handled within the with block. Looking at that code, it looks like it should probably work. However, if I roll back to 0.2.7, I run into the "KeyError('content_vector')" that triggered issue #24064. For the moment, I've gotten things running by overriding AzureSearch as follows:

class ExtendedAzureSearch(AzureSearch):
    """Extended AzureSearch class with patch to fix async support."""

    async def _asimple_search_docs(
        self,
        embedding: List[float],
        text_query: str,
        k: int,
        *,
        filters: Optional[str] = None,
        **kwargs: Any,
    ) -> List[Tuple[Document, float]]:
        """Perform vector or hybrid search in the Azure search index.

        Args:
            embedding: A vector embedding to search in the vector space.
            text_query: A full-text search query expression;
                Use "*" or omit this parameter to perform only vector search.
            k: Number of documents to return.
            filters: Filtering expression.
        Returns:
            Matching documents with scores
        """
        from azure.search.documents.models import VectorizedQuery

        async with self.async_client as async_client:
            results = await async_client.search(
                search_text=text_query,
                vector_queries=[
                    VectorizedQuery(
                        vector=np.array(embedding, dtype=np.float32).tolist(),
                        k_nearest_neighbors=k,
                        fields=FIELDS_CONTENT_VECTOR,
                    )
                ],
                filter=filters,
                top=k,
                **kwargs,
            )
            docs = [
                (
                    Document(
                        page_content=result.pop(FIELDS_CONTENT),
                        metadata=json.loads(result[FIELDS_METADATA])
                        if FIELDS_METADATA in result
                        else {
                            key: value for key, value in result.items() if key != FIELDS_CONTENT_VECTOR
                        },
                    ),
                    float(result["@search.score"]),
                )
                async for result in results
            ]
        return docs

    # AP-254 - This version of avector_search_with_score() calls _asimple_search_docs() instead of _asimple_search()
    # followed by _results_to_documents(results) because _asimple_search() uses `async with self.async_client`, which
    # closes the paging connection on return, which makes it so the results are not available for
    # _results_to_documents() (triggering "TypeError: 'AsyncSearchItemPaged' object is not iterable").
    async def avector_search_with_score(
        self,
        query: str,
        k: int = 4,
        filters: Optional[str] = None,
        **kwargs: Any,
    ) -> List[Tuple[Document, float]]:
        """Return docs most similar to query.

        Args:
            query (str): Text to look up documents similar to.
            k (int, optional): Number of Documents to return. Defaults to 4.
            filters (str, optional): Filtering expression. Defaults to None.

        Returns:
            List[Tuple[Document, float]]: List of Documents most similar
                to the query and score for each
        """
        embedding = await self._aembed_query(query)
        return await self._asimple_search_docs(
            embedding, "", k, filters=filters, **kwargs
        )

System Info

System Information

OS: Darwin OS Version: Darwin Kernel Version 23.5.0: Wed May 1 20:12:58 PDT 2024; root:xnu-10063.121.3~5/RELEASE_ARM64_T6000 Python Version: 3.10.9 (v3.10.9:1dd9be6584, Dec 6 2022, 14:37:36) [Clang 13.0.0 (clang-1300.0.29.30)]

Package Information

langchain_core: 0.2.9 langchain: 0.2.11 langchain_community: 0.2.10 langsmith: 0.1.81 langchain_aws: 0.1.7 langchain_openai: 0.1.8 langchain_text_splitters: 0.2.2 langchainplus_sdk: 0.0.21 langgraph: 0.1.14

chrislrobert commented 1 month ago

Actually, my fix (and, I presume, the original async code) still has a pretty fatal flaw: because all of these with blocks close the client, you can only make one call in a chain, then you trigger this on any subsequent calls:

ValueError("HTTP transport has already been closed. You may check if you're calling a function outside of the async with of your client creation, or if you called await close() on your client already.")

Rather than having to re-establish the connection with each call, I have tried keeping it open and closing it on deletion of the AzureSearch object. But all of this makes me uncomfortable, and I wonder what @baskaryan in PR #22075 had in mind re: the management of the async Azure search client. It's all still feeling quite messy and not-quite-complete...

sherlock-tez commented 1 month ago

I met the same issue lol.

` async def asemantic_hybrid_search_with_score_and_rerank( self, query: str, k: int = 4, *, filters: Optional[str] = None, **kwargs: Any ) -> List[Tuple[Document, float, float]]: """Return docs most similar to query with a hybrid query.

    Args:
        query: Text to look up documents similar to.
        k: Number of Documents to return. Defaults to 4.
        filters: Filtering expression.

    Returns:
        List of Documents most similar to the query and score for each
    """
    from azure.search.documents.models import VectorizedQuery

    vector = await self._aembed_query(query)
    async with self.async_client as async_client:
        results = await async_client.search(
            search_text=query,
            vector_queries=[
                VectorizedQuery(
                    vector=np.array(vector, dtype=np.float32).tolist(),
                    k_nearest_neighbors=k,
                    fields=FIELDS_CONTENT_VECTOR,
                )
            ],
            filter=filters,
            query_type="semantic",
            semantic_configuration_name=self.semantic_configuration_name,
            query_caption="extractive",
            query_answer="extractive",
            top=k,
            **kwargs,
        )`
JuliaZamaitat commented 1 month ago

I am also running into the same error message trying to call ainvoke on the AzureSearchVectorStoreRetriever, which is also using the methods described above. The synchronous call is working fine, but the async call causes the error "'AsyncSearchItemPaged' object is not iterable" during retrieval phase.

chrislrobert commented 1 month ago

@JuliaZamaitat, you can override the class to remove the with blocks and make other adjustments as-needed. @thedavgar has made the changes in #24921, so you can use that as a guide until it's been approved and included in a release. Hopefully it won't be long before the async support is working well!

prawiraelang commented 1 week ago

Hi, I encountered the same issue. I've read about this issue and the PR #24921 that was created to fix it. However, I can't find the solution code in the latest release of langchain-community==0.2.12 here. Is it expected that the latest version still contains this issue, or did I miss something?

chrislrobert commented 1 week ago

@prawiraelang I was confused too, as the changes were merged into master Aug. 13. However, the current release of langchain-community was on Aug. 12. So, I guess we just have to wait till they issue a new release...

prawiraelang commented 1 week ago

@prawiraelang I was confused too, as the changes were merged into master Aug. 13. However, the current release of langchain-community was on Aug. 12. So, I guess we just have to wait till they issue a new release...

Thanks @chrislrobert for your response. Let's wait then. Hopefully, the fix will be released soon.