deepset-ai / haystack-core-integrations

Additional packages (components, document stores and the likes) to extend the capabilities of Haystack version 2.0 and onwards
https://haystack.deepset.ai
Apache License 2.0
82 stars 78 forks source link

MongoDBAtlasEmbeddingRetriever no longer working on Atlas Cloud - works on Atlas local #795

Closed MikeRecognex closed 1 month ago

MikeRecognex commented 1 month ago

Describe the bug I have successfully been using the Haystack MongoDB integration for 4 plus months now on Haystack 2x using from haystack_integrations.components.retrievers.mongodb_atlas import MongoDBAtlasEmbeddingRetriever. Suddenly the Mongo embedding based retrieval has stopped working for my cloud Atlas across all versions of my code where it was defiinitely wokring in production ealrier this week. (2.0 .0 and 2.2.0rc0) This, together with my explanation below that my local Atlas does not have the error makes me suspect that Mongo have introduced a breaking change in the aggregation pipeline... I am suddenly picking up on error as follows:

Error message

Retrieval of documents from MongoDB Atlas failed: invalid parameter: expected an object (filter), full error: {'ok': 0.0, 'errmsg': 'invalid parameter: expected an object (filter)', 'code': 10065, 'codeName': 'Location10065', '$clusterTime': {'clusterTime': Timestamp(1717846171, 14), 'signature': {'hash': b'7\xa3m?\xf6\xae3hBN\x0f\xf4\xe8\xbf~\xdd\x1a\xc8\xb4\x0f', 'keyId': 7340746021133090832}}, 'operationTime': Timestamp(1717846171, 14)}

IMPORTANTLY... I run both a cloud Atlas instance and a local Dev instance and the error has only suddenly started occurring on the Cloud version - it does NOT appear when retrieving from my local Atlas

FULL ERROR BELOW - HERE

INFO:haystack.core.pipeline.pipeline:Running component retriever 2024-06-08 12:29:31.770 Uncaught app exception Traceback (most recent call last): File "/xxxxx/python3.9/site-packages/haystack_integrations/document_stores/mongodb_atlas/document_store.py", line 250, in _embedding_retrieval documents = list(self.collection.aggregate(pipeline)) File "xxxxxx/python3.9/site-packages/pymongo/collection.py", line 2720, in aggregate return self._aggregate( File "/xxxxxxx/haystack_env/lib/python3.9/site-packages/pymongo/_csot.py", line 107, in csot_wrapper return func(self, *args, kwargs) File "xxxxxx/haystack_env/lib/python3.9/site-packages/pymongo/collection.py", line 2627, in _aggregate return self.__database.client._retryable_read( File "xxxxxxx/haystack_env/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1492, in _retryable_read return self._retry_internal( File "/xxxxxxxxx/haystack_env/lib/python3.9/site-packages/pymongo/_csot.py", line 107, in csot_wrapper return func(self, *args, *kwargs) File "/xxxxxx/haystack_env/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1453, in _retry_internal return _ClientConnectionRetryable( File "/xxxxx/haystack_env/lib/python3.9/site-packages/pymongo/mongo_client.py", line 2315, in run return self._read() if self._is_read else self._write() File "/xxxxxxxx/haystack_env/lib/python3.9/site-packages/pymongo/mongo_client.py", line 2445, in _read return self._func(self._session, self._server, conn, read_pref) # type: ignore File "/xxxxxxxx/haystack_env/lib/python3.9/site-packages/pymongo/aggregation.py", line 164, in get_cursor result = conn.command( File "/xxxxxxx/haystack_env/lib/python3.9/site-packages/pymongo/helpers.py", line 322, in inner return func(args, kwargs) File "/xxxxxxx/haystack_env/lib/python3.9/site-packages/pymongo/pool.py", line 968, in command return command( File "/xxxxx/haystack_env/lib/python3.9/site-packages/pymongo/network.py", line 192, in command helpers._check_command_response( File "/xxxxxxxxxx/haystack_env/lib/python3.9/site-packages/pymongo/helpers.py", line 230, in _check_command_response raise OperationFailure(errmsg, code, response, max_wire_version) pymongo.errors.OperationFailure: invalid parameter: expected an object (filter), full error: {'ok': 0.0, 'errmsg': 'invalid parameter: expected an object (filter)', 'code': 10065, 'codeName': 'Location10065', '$clusterTime': {'clusterTime': Timestamp(1717846171, 14), 'signature': {'hash': b'7\xa3m?\xf6\xae3hBN\x0f\xf4\xe8\xbf~\xdd\x1a\xc8\xb4\x0f', 'keyId': 7340746021133090832}}, 'operationTime': Timestamp(1717846171, 14)}

The above exception was the direct cause of the following exception:

.............

res = comp.run(**last_inputs[name])

File "xxxxxx/haystack_env/lib/python3.9/site-packages/haystack_integrations/components/retrievers/mongodb_atlas/embedding_retriever.py", line 114, in run docs = self.document_store._embedding_retrieval( File "xxxxxxx/haystack_env/lib/python3.9/site-packages/haystack_integrations/document_stores/mongodb_atlas/document_store.py", line 258, in _embedding_retrieval raise DocumentStoreError(msg) from e haystack.document_stores.errors.errors.DocumentStoreError: Retrieval of documents from MongoDB Atlas failed: invalid parameter: expected an object (filter), full error: {'ok': 0.0, 'errmsg': 'invalid parameter: expected an object (filter)', 'code': 10065, 'codeName': 'Location10065', '$clusterTime': {'clusterTime': Timestamp(1717846171, 14), 'signature': {'hash': b'7\xa3m?\xf6\xae3hBN\x0f\xf4\xe8\xbf~\xdd\x1a\xc8\xb4\x0f', 'keyId': 7340746021133090832}}, 'operationTime': Timestamp(1717846171, 14)}

Expected behavior Expected MongoDBAtlasEmbeddingRetriever to continue functioning normally as it has been doing for last 5months..

Additional context The code works successfully for embedding based retrieval on on my local Mongo Atlas installation therefor I believe the error is due to a breaking change in Mongo Atlas cloud. Here is my pipeline which as worked to date..

rag_pipeline = Pipeline()
rag_pipeline.add_component(instance=query_embedder, name="query_embedder")
rag_pipeline.add_component(instance=MongoDBAtlasEmbeddingRetriever(document_store=new_document_store), name="retriever")
rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
# rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm")
rag_pipeline.add_component("llm", OllamaGenerator(model="llama2",timeout=240, url="http://localhost:11434/api/generate", generation_kwargs={"temperature": 0.35, "top_k": 30}))
rag_pipeline.connect("query_embedder", "retriever.query_embedding")
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")

I have checked that the vector search is atcually available on my online Atlas. To do this I successfully ran a search using an agregate pipeline on the target collection using:

[ { "$vectorSearch": { "index": "RFPRespRepoVector", "path": "embedding", "queryVector": [0.037561170756816864, -0.045408181846141815, etc etc etc -0.042644526809453964], "numCandidates": 1, "limit": 1 } } ]

To Reproduce

I have redacted section to reproduce as I have identified issue. Issue is that the the filters parameter is no longer optional. Probably because Mongo want their customers filtering their embedding searches wherever possible....

FAQ Check

System:

MikeRecognex commented 1 month ago

I believe that the issue was caused by Mongo suddenly making the filters parameter mandatory for embedding search retrieval. Please note Haystack documentation currently states that this is an optional parameter as per Haystack Mongo Integration spec here... https://docs.haystack.deepset.ai/reference/integrations-mongodb-atlas

As soon as I added the filters parameter I was able to get the vector embedding search working again. Please note that I could only do this by adding the target filter field to the vector search index as well as the embedding filed. To better explain, here is my vector search index in Mongo Atlas...

{ "fields": [ { "numDimensions": 768, "path": "embedding", "similarity": "euclidean", "type": "vector" }, { "path": "content", "type": "filter" } ] }

My working test code is below....

import os from haystack import Pipeline, Document from haystack.document_stores.types import DuplicatePolicy from haystack.components.writers import DocumentWriter from haystack.components.generators import OpenAIGenerator from haystack.components.builders.prompt_builder import PromptBuilder from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder from haystack_integrations.document_stores.mongodb_atlas import MongoDBAtlasDocumentStore from haystack_integrations.components.retrievers.mongodb_atlas import MongoDBAtlasEmbeddingRetriever

def set_env_variable(key, value): """Upsert an environment variable and write to .env file for persistence."""

Set the environment variable in the session

os.environ[key] = value

# Initialize a variable to track if the key is found
found = False
updated_lines = []

# Attempt to read the existing content from the .env file
try:
    with open('.env', 'r') as f:
        lines = f.readlines()

        for line in lines:
            # Check if the current line contains the key
            if line.startswith(key + '='):
                updated_lines.append(f'{key}={value}\n')
                found = True
            else:
                updated_lines.append(line)
except FileNotFoundError:
    # If the file doesn't exist, we'll create it later
    pass

# If the key was not found in existing lines, append it
if not found:
    updated_lines.append(f'{key}={value}\n')

# Write the updated content back to the .env file
with open('.env', 'w') as f:
    f.writelines(updated_lines)

Set up the MongoDB Atlas connection

set_env_variable('MONGO_CONNECTION_STRING', 'xxxxxxxxxxxxx') set_env_variable('OPENAI_API_KEY', 'xxxxxxxxxxxx') set_env_variable('TOKENIZERS_PARALLELISM', 'false') database_name = "RFP" collection_name = "Response_Repository" vector_search_index = "vector_index"

Create a MongoDBAtlasDocumentStore instance

document_store = MongoDBAtlasDocumentStore( database_name=database_name, collection_name=collection_name, vector_search_index=vector_search_index, )

print(f"Document store contains {document_store.count_documents()} documents")

Create some example documents

documents = [ Document(content="My name is Jean and I live in Paris."), Document(content="My name is Mark and I live in Berlin."), Document(content="My name is Giorgio and I live in Rome."), ]

Define some more components

doc_writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP) doc_embedder = SentenceTransformersDocumentEmbedder(model="intfloat/e5-base-v2") query_embedder = SentenceTransformersTextEmbedder(model="intfloat/e5-base-v2")

Pipeline that ingests document for retrieval

indexing_pipe = Pipeline() indexing_pipe.add_component(instance=doc_embedder, name="doc_embedder") indexing_pipe.add_component(instance=doc_writer, name="doc_writer")

indexing_pipe.connect("doc_embedder.documents", "doc_writer.documents") indexing_pipe.run({"doc_embedder": {"documents": documents}})

print(f"Document store contains {document_store.count_documents()} documents after indexing")

Define a filter that excludes documents where the content field is "zaphodbeedlebrox"

filters = { "content": {"$ne": "zaphodbeedlebrox"} }

Print the filter for debugging

print("Filter being used:", filters)

Build a RAG pipeline with a Retriever to get documents relevant to

the query, a PromptBuilder to create a custom prompt and the OpenAIGenerator (LLM)

prompt_template = """ Given these documents, answer the question.\nDocuments: {% for doc in documents %} {{ doc.content }} {% endfor %}

\nQuestion: {{question}} \nAnswer: """ rag_pipeline = Pipeline() rag_pipeline.add_component(instance=query_embedder, name="query_embedder")

rag_pipeline.add_component(instance=MongoDBAtlasEmbeddingRetriever(document_store=document_store, filters=filters, top_k=10), name="retriever")

rag_pipeline.add_component(instance=MongoDBAtlasEmbeddingRetriever(document_store=document_store,filters=filters), name="retriever") rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder") rag_pipeline.add_component(instance=OpenAIGenerator(model="gpt-4", generation_kwargs={"temperature": 0.2, "top_p": 0.3}), name="llm") rag_pipeline.connect("query_embedder", "retriever.query_embedding") rag_pipeline.connect("retriever", "prompt_builder.documents") rag_pipeline.connect("prompt_builder", "llm")

Ask a question on the data you just added.

question = "Where does Mark live?" result = rag_pipeline.run( { "query_embedder": {"text": question}, "prompt_builder": {"question": question}, } ) print(result)

MikeRecognex commented 1 month ago

It woudl be great to have verified if this is what actually happened.. If it is then the documentation will need updating in multiple places

anakin87 commented 1 month ago

Hello!

I tried to run our example with haystack-ai==2.2.1 and mongodb-atlas-haystack==0.3.0.

In particular, I tried it with 2 different vector_search_index configurations:

querying.run({"embedder": {"text": "What is a cross-encoder?"}}) (embedding retrieval without filters) works fine.


I suggest installing the latest version of haystack-ai, check your vector_search_index and let us know.

MikeRecognex commented 1 month ago

Thanks for your time on looking at this!! After realising I was on mongodb-atlas-haystack==0.2.0 and after upgrading to 0.3.0, I believe it has fixed the problem! I am extremely grateful for your help!!