run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
35.38k stars 4.98k forks source link

[Question]: {'message': 'Failed to send 1 objects in a batch of 1. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.'} #15007

Open haozhuoyuan opened 1 month ago

haozhuoyuan commented 1 month ago

Question Validation

Question

import time
import weaviate
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.vector_stores.weaviate import WeaviateVectorStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core import StorageContext, Settings
from llama_index.readers.file import PyMuPDFReader
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())
import nest_asyncio
nest_asyncio.apply()  # Only needed in Jupyter notebooks
weaviate_client = weaviate.connect_to_local()
weaviate_client.connect()
Settings.llm = OpenAI(temperature=0, model="gpt-4o")
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small", dimensions=512)
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=100)
documents = SimpleDirectoryReader("./data1").load_data()
nodes = splitter.get_nodes_from_documents(documents)
print(nodes)
if weaviate_client.collections.exists("TextNode"):
    weaviate_client.collections.delete("TextNode")
schema = {
           "class": "TextNode",
           "properties": [
               {"name": "id_", "dataType": ["string"], },
               {"name": "embedding", "dataType": ["number[]"], },
               {"name": "file_path", "dataType": ["string"], },
               {"name": "file_name", "dataType": ["string"], },
               {"name": "file_type", "dataType": ["string"], },
               {"name": "file_size", "dataType": ["int"], },
               {"name": "creation_date", "dataType": ["string"], },
               {"name": "last_modified_date", "dataType": ["string"], },
               # {"name": "source", "dataType": ["string"], },
               {"name": "text", "dataType": ["text"], },
               {"name": "start_char_idx", "dataType": ["int"], },
               {"name": "end_char_idx", "dataType": ["int"], }
               # {"name": "metadata_str", "dataType": ["string"], },
               # {"name": "content", "dataType": ["text"], },
           ]
       }
weaviate_client.collections.create_from_dict(schema)
try:
    collection = weaviate_client.collections.get("TextNode")
    data_lines = []
    for node in nodes:
        embedding = Settings.embed_model.get_text_embedding(node.text)  # 生成嵌入
        node.embedding = embedding 
        properties = {
            "id": node.id_,
            "embedding": node.embedding,
            "file_path": node.metadata.get("file_path"),
            "file_name": node.metadata.get("file_name"),
            "file_type": node.metadata.get("file_type"),
            "file_size": node.metadata.get("file_size"),
            "creation_date": node.metadata.get("creation_date"),
            "last_modified_date": node.metadata.get("last_modified_date"),
            # "source": node.metadata.get("source"),
            "text": node.text,
            "start_char_idx": node.start_char_idx,
            "end_char_idx": node.end_char_idx,
            # "metadata_str": node.metadata_template,
            # "content": node.text,
        }
        data_lines.append(properties)
    print(data_lines)
    with collection.batch.dynamic() as batch:
        for data_line in data_lines:
            batch.add_object(properties=data_line)
    print("node insert completation!!!!!!!!!!!")
    vector_store = WeaviateVectorStore(weaviate_client=weaviate_client, index_name="TextNode")
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    index = VectorStoreIndex.from_vector_store(vector_store)
    print(index.index_struct)
    print(index.storage_context)

    query_engine = index.as_query_engine()

    while True:
        question = input("User: ")
        if question.strip() == "":
            break
        start_time = time.time()
        response = query_engine.query(question)
        end_time = time.time()
        print(f"Time taken: {end_time - start_time} seconds")
        print(f"AI: {response}")
finally:
    weaviate_client.close()

Error message is: {'message': 'Failed to send 1 objects in a batch of 1. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.'}

How should I solve it?Thank you

dosubot[bot] commented 1 month ago

@haozhuoyuan Hello there! I'm here to help you with any bugs, questions, or contributions while you wait for a human maintainer.

To solve the error encountered while trying to insert objects into a Weaviate collection using a batch process, ensure that the batch object is correctly used within the add_node function. Here is the relevant code snippet that shows how to add a node using a batch process:

def add_node(
    client: "WeaviateClient",
    node: BaseNode,
    class_name: str,
    batch: Optional[Any] = None,
    text_key: str = DEFAULT_TEXT_KEY,
) -> None:
    """Add node."""
    metadata = {}
    metadata[text_key] = node.get_content(metadata_mode=MetadataMode.NONE) or ""

    additional_metadata = node_to_metadata_dict(
        node, remove_text=True, flat_metadata=False
    )
    metadata.update(additional_metadata)

    vector = node.get_embedding()
    id = node.node_id

    # if batch object is provided (via a context manager), use that instead
    if batch is not None:
        batch.add_data_object(metadata, class_name, id, vector)
    else:
        client.batch.add_data_object(metadata, class_name, id, vector)

Ensure that the batch object is correctly instantiated and passed to the add_node function. If the batch object is not provided, the function will fall back to inserting the object directly into the collection.

Additionally, you can refer to the following test case to understand how to mock and test the batch process in Weaviate:

import sys
from unittest.mock import MagicMock

from llama_index.legacy.schema import NodeRelationship, RelatedNodeInfo, TextNode
from llama_index.legacy.vector_stores.weaviate import WeaviateVectorStore

def test_weaviate_add() -> None:
    # mock import
    sys.modules["weaviate"] = MagicMock()
    weaviate_client = MagicMock()
    batch_context_manager = MagicMock()
    weaviate_client.batch.__enter__.return_value = batch_context_manager

    vector_store = WeaviateVectorStore(weaviate_client=weaviate_client)

    vector_store.add(
        [
            TextNode(
                text="test node text",
                id_="test node id",
                relationships={
                    NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test doc id")
                },
                embedding=[0.5, 0.5],
            )
        ]
    )

    args, _ = batch_context_manager.add_data_object.call_args
    assert args[-1] == [0.5, 0.5]

This test case demonstrates how to mock the Weaviate client and batch context manager to ensure that the batch process is correctly handled.

Furthermore, the add method in the WeaviateVectorStore class shows how to use the batch process within a context manager:

def add(
    self,
    nodes: List[BaseNode],
    **add_kwargs: Any,
) -> List[str]:
    """Add nodes to index.

    Args:
        nodes: List[BaseNode]: list of nodes with embeddings

    """
    ids = [r.node_id for r in nodes]

    with self._client.batch.dynamic() as batch:
        for node in nodes:
            add_node(
                self._client,
                node,
                self.index_name,
                batch=batch,
                text_key=self.text_key,
            )
    return ids

This method ensures that the batch process is correctly managed and that nodes are added to the Weaviate collection efficiently [1][2][3][4].

To continue talking to Dosu, mention @dosu.