langchain-ai / langchain

🦜🔗 Build context-aware reasoning applications
https://python.langchain.com
MIT License
93.1k stars 14.97k forks source link

Record manager considers some of the documents as updated while they are not changed #21028

Closed MohammedShokr closed 5 months ago

MohammedShokr commented 5 months ago

Checked other resources

Example Code

vectorstore = WeaviateVectorStore(
                self.client,
                index_name=self.index_name,
                embedding=self.embeddings,
                text_key=self.text_key,
                by_text=False,
            )
# Create the record manager
namespace = f"weaviate/{self.index_name}"
record_manager = SQLRecordManager(
    namespace, db_url=self.db_url
)
# Add the summaries to the docs list
if summaries:
    docs.extend(summaries)

record_manager.create_schema()
# Index the documents to Weaviate with the record manager
result_dict = index(
    docs,
    record_manager,
    vectorstore,
    cleanup="incremental",
    source_id_key="source",
)

Error Message and Stack Trace (if applicable)

No response

Description

When indexing a list of documents utilizing the record manager in incremental deletion mode, with each document assigned a unique identifier (UUID) as the source, I encounter an unexpected behavior. The record manager deletes and re-indexes a subset of documents even though there have been no changes to those documents. Upon rerunning the same code with identical documents, the output is {'num_added': 80, 'num_updated': 0, 'num_skipped': 525, 'num_deleted': 80}.

Furthermore, I am using a recursive text splitter to segment the documents; also I am generating a summary for each document and I change the summary metadata to add the source of the original document so it is considered as a chunk of the original document.

Finally, please note that I tried the same code on different sets of documents and the issue is not consistent.

System Info

System Information

OS: Linux OS Version: #29~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Thu Apr 4 14:39:20 UTC 2 Python Version: 3.11.6 (main, Oct 4 2023, 18:31:23) [GCC 11.4.0]

Package Information

langchain_core: 0.1.46 langchain: 0.1.12 langchain_community: 0.0.28 langsmith: 0.1.31 langchain_openai: 0.1.4 langchain_text_splitters: 0.0.1 langchain_weaviate: 0.0.1rc5

Packages not installed (Not Necessarily a Problem)

The following packages were not found:

langgraph langserve

eyurtsev commented 5 months ago

@MohammedShokr thanks for reporting.

There isn't enough context here to determine whether the bug is in the SQLRecordManager or in another component in LangChain or in user code.

If you're able to isolate the problem or provide a minimal reproducible script. It should contain all relevant imports and data to index.

MohammedShokr commented 5 months ago

HI @eyurtsev I understand your concern about isolating the issue. Unfortunately, I'm unable to share the data as it's confidential. However, I've taken measures to ensure consistency. I've saved the docs before indexing as a pickle file from different runs and compared them both for page content and metadata and they are the same.

MohammedShokr commented 5 months ago

Update: I've identified the root cause of the issue. It stems from the order of the document list and the batch size. Since the summaries are appended to the end of the document list, they end up being indexed in a separate batch, triggering updates. To address this, I've increased the batch size to cover all my documents. However, I'm still wondering how to make sure that all document chunks are consistently included in the same batch. Any insights or suggestions on how to achieve this would be greatly appreciated.

eyurtsev commented 5 months ago

@MohammedShokr for a minimal reproducible example, it's enough to seed this with a test case involving fake data.

documents = [
    Document(page_content='hello', metadata={'source': 1}),
    Document(page_content='goodbye', metadata={'source': 2})
    Document(page_content='meow', metadata={'source': 3})
    Document(page_content='woof', metadata={'source': 1})
]

Is the claim that indexing this with a batch size of 2 creates incorrect results? If you're able to create a test case like that together with information about you see. vs what you expect to see that's very helpful for us to fix the issue

MohammedShokr commented 5 months ago

Here's a script to reproduce the issue, run this script twice and you will see that the record manager re-ingests the first document because one of its chunks came out of the batch.

from langchain.indexes import SQLRecordManager, index
from langchain.docstore.document import Document
from langchain_weaviate import WeaviateVectorStore
from langchain_openai import OpenAIEmbeddings
from dotenv import load_dotenv
import os
import weaviate

# Load env variables
_ = load_dotenv()
WEAVIATE_API_KEY = os.environ["WEAVIATE_API_KEY"]
WEAVIATE_HTTP_HOST = os.environ["WEAVIATE_HTTP_HOST"]
WEAVIATE_HTTP_PORT = os.environ["WEAVIATE_HTTP_PORT"]
WEAVIATE_GRPC_HOST = os.environ["WEAVIATE_GRPC_HOST"]
WEAVIATE_GRPC_PORT = os.environ["WEAVIATE_GRPC_PORT"]
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")

# Create a Weaviate client and connect to it
client = weaviate.client.WeaviateClient(
            connection_params = weaviate.connect.ConnectionParams.from_params(
                http_host = WEAVIATE_HTTP_HOST, http_port=WEAVIATE_HTTP_PORT, http_secure=False,
                grpc_host=WEAVIATE_GRPC_HOST, grpc_port=WEAVIATE_GRPC_PORT, grpc_secure=False
            ),
            auth_client_secret=weaviate.auth.AuthApiKey(api_key=WEAVIATE_API_KEY),   
        )
client.connect()

# Create a Weaviate vectorstore
index_name = "Test_index"
vectorstore = WeaviateVectorStore(client, 
                       index_name = index_name, 
                       embedding = OpenAIEmbeddings(),
                       text_key = "text",)

# Create the record manager
namespace = f"weaviate/{index_name}"
record_manager = SQLRecordManager(
    namespace, db_url="sqlite:///record_manager_cache.sql"
)

record_manager.create_schema()

# Define the documents
documents = [
    Document(page_content='hello', metadata={'source': 1}),
    Document(page_content='goodbye', metadata={'source': 2}),
    Document(page_content='meow', metadata={'source': 3}),
    Document(page_content='woof', metadata={'source': 1}),
]

# Index the documents
results = index(
    documents,
    record_manager,
    vectorstore,
    cleanup="incremental",
    source_id_key="source",
    batch_size=2,
)
client.close()
print(results)
eyurtsev commented 5 months ago

OK I recreated the issue.

incremental mode was written to support continuous clean up (i.e., minimize the amount of time when duplicated content exists in the index). This only really works if content derived from the same document is present in the same batch.

If this condition is not met, the indexing code will not be able to avoid some redundant work (i.e., it'll end up forcefully re-indexing content that it should've skipped). The end state of the index is still correct (as long as there was no network failure in the middle etc.)

What I need to do is:

  1. Clarify this in the documentation.
  2. Potentially add another indexing mode which is between incremental and full_mode. This mode will not be able to optimize for duplicated content present in the index between batches.
MohammedShokr commented 5 months ago

Thank you for clarifying the situation,

I'm considering increasing the batch_size to cover my batch of documents (I need to re-index around 5k docs every day). However, I'm hesitant due to potential consequences. Could you please confirm whether increasing the batch size will effectively mitigate the issue of redundant work while potentially slowing down the cleanup process?

eyurtsev commented 5 months ago

That's correct: it will decrease redundant work, but increase time when duplicates might exist.

You can handle the issue entirely on your side by grouping documents that share the same source id into the same batch, and controlling the batch size dynamically.

I haven't checked, but i hope that the indexing API works without a batch size, if that's the case you should be able to entirely control the indexing behavior without any oddities of having to dynamically calculate a batch size.

MohammedShokr commented 5 months ago

Thank you for the insight! Closing the issue now.