milvus-io / milvus

A cloud-native vector database, storage for next generation AI applications
https://milvus.io
Apache License 2.0
28.97k stars 2.79k forks source link

[Bug]: milvus_haystack write embedding take more then 3 second #34635

Open yoke233 opened 1 month ago

yoke233 commented 1 month ago

Is there an existing issue for this?

Environment

- Milvus version: 2.4.5
- Deployment mode(standalone or cluster): standalone 
- MQ type(rocksmq, pulsar or kafka):    
- SDK version(e.g. pymilvus v2.0.0rc2): 2.4.4
- OS(Ubuntu or CentOS): windows
- CPU/Memory: 
- GPU: 
- Others:

Current Behavior

614b10d99ca2aa2e9a781f8e892397e

Each embedding + storage takes more then 3 seconds

using Milvus as document_store in pipeline

pipeline

document_store = MilvusDocumentStore(
    collection_name=req.collection,
    connection_args={
        # "uri": "./milvus.db",
        "host": "127.0.0.1",
        "port": "19530",
        # "uri": config.get("milvus.uri"),
        # "token": config.get("milvus.token"),
    },
    drop_old=True,
)

def vector_pipe(
    document_store: DocumentStore,
    extra_n: int = 10,
    model: str = "bge-m3",
) -> Pipeline:
    """
    params extra
    """

    # proposition = DocumentProposition(extra_n=extra_n)
    splitter = DocumentSegmentationSplitter(min_length=200)

    embedder = OpenAIDocumentEmbedder(
        api_key=Secret.from_token(config.get("oneapi.api_key")),
        api_base_url=config.get("oneapi.api_base_url"),
        model=model,
        progress_bar=False,
    )

    pipe = Pipeline()
    pipe.add_component("extra", splitter)
    pipe.add_component("embedder", embedder)
    pipe.add_component("writer", DocumentWriter(document_store))
    pipe.connect("extra", "embedder")
    pipe.connect("embedder", "writer")
    return pipe

Run

for index, doc in enumerate(documents):
    logger.debug(f"extra rag: {doc.meta}")
    one = RepositoryFileBlock()
    one.namespace = req.namespace
    one.collection = req.collection

    one.user_id = req.user_id
    one.file_id = req.file_id
    one.doc_id = doc.id
    one.content = doc.content
    one.meta = doc.meta
    # logger.debug(f"save to db ")
    # save_document_block(session, one)

    logger.debug(f"store embedding start")
    pipe.run({"extra": {"documents": [doc], "meta": vector_meta}})
    logger.debug(f"store embedding ok")

    progress_percentage = (index + 1) / total_docs * 100

Expected Behavior

No response

Steps To Reproduce

No response

Milvus Log

No response

Anything else?

document_store = QdrantDocumentStore(
    url="http://localhost:6333",
    recreate_index=True,
    return_embedding=False,
    wait_result_from_api=True,
    embedding_dim=1024,
)

just change document_store to qdrant, is faster

9adda55712b1f29447afcda655e4459
yanliang567 commented 1 month ago

/assign @jaelgu could you please help to take a look? @yoke233 are you running on milvus lite? /unassign

yoke233 commented 1 month ago

milvus running on docker compose

it may be MilvusDocumentStore's bug

jaelgu commented 1 month ago

Forwarded to Milvus-haystack repo: https://github.com/milvus-io/milvus-haystack/issues/29

zc277584121 commented 1 month ago

@yoke233 can you try this demo, which excludes other interfering factors

import time

from milvus_haystack import MilvusEmbeddingRetriever, MilvusDocumentStore
from haystack import Document

DEFAULT_CONNECTION_ARGS = {
    # "uri": "http://localhost:19530",  # This uri works for Milvus docker service
    "uri": "./milvus_test.db",  # This uri works for Milvus Lite
}

document_store = MilvusDocumentStore(  # or any other vector db instance
    connection_args=DEFAULT_CONNECTION_ARGS,
    consistency_level="Strong",
    drop_old=True,
)

def run(document_store: MilvusDocumentStore):
    documents = []
    doc = Document(
        content="A Foo Document",
        meta={
            "name": "name_0",
            "page": "100",
            "chapter": "intro",
            "number": 2,
            "date": "1969-07-21T20:17:40",
        },
        embedding=[-10.0] * 128,
    )

    for i in range(1000):
        documents.append(doc)
    t0 = time.time()
    document_store.write_documents(documents)
    t1 = time.time()
    print(f"Time to write documents: {t1 - t0} seconds")

    # retriever = MilvusEmbeddingRetriever(
    #     document_store,
    # )
    # query_embedding = [-10.0] * 128
    # res = retriever.run(query_embedding)

if __name__ == '__main__':
    run(document_store)

## Time to write documents: 0.7143099308013916 seconds

If everything is right, the Time to write documents should be a small number.

zc277584121 commented 3 weeks ago

@yoke233 Could you please provide more information, or just test the minimal script I provided.

yoke233 commented 10 hours ago

@zc277584121

Install Milvus in Docker

# Download the configuration file
$ wget https://github.com/milvus-io/milvus/releases/download/v2.4.9/milvus-standalone-docker-compose.yml -O docker-compose.yml

# Start Milvus
$ sudo docker compose up -d

run with

DEFAULT_CONNECTION_ARGS = {
    "uri": "http://localhost:19530",  # This uri works for Milvus docker service
    # "uri": "./milvus_test.db",  # This uri works for Milvus Lite
}

Time to write documents: 4.002654314041138 seconds

zc277584121 commented 5 hours ago

@yoke233 It is a little bit weird. how about this code, could you please try it one?

import time

from milvus_haystack import MilvusEmbeddingRetriever, MilvusDocumentStore
from haystack import Document

DEFAULT_CONNECTION_ARGS = {
    "uri": "http://localhost:19530",  # This uri works for Milvus docker service
    # "uri": "./milvus_test.db",  # This uri works for Milvus Lite
}

document_store = MilvusDocumentStore(  # or any other vector db instance
    connection_args=DEFAULT_CONNECTION_ARGS,
    consistency_level="Strong",
    drop_old=True,
)

def run(document_store: MilvusDocumentStore):
    documents = []
    doc = Document(
        content="A Foo Document",
        meta={
            "name": "name_0",
            "page": "100",
            "chapter": "intro",
            "number": 2,
            "date": "1969-07-21T20:17:40",
        },
        embedding=[-10.0] * 128,
    )

    for i in range(1):
        documents.append(doc)
    t0 = time.time()

    t1 = time.time()
    document_store.write_documents(documents)
    print(f"Time to write 1 document: {t1 - t0} seconds")
    repeat_num = 1000
    insert_list = [
        ['name_0'] * repeat_num,
        ['100'] * repeat_num,
        ['intro'] * repeat_num,
        [2] * repeat_num,
        ['1969-07-21T20:17:40'] * repeat_num,
        ["A Foo Document"] * repeat_num,
        # ['8357ea4daf0a0e5414715b2bbb5a57607bb80d7878e9d8f1efb9aa3e1d1c5951'] * repeat_num,
        [str(i) for i in range(repeat_num)],
        [[-10.0] * 128] * repeat_num
    ]
    t2 = time.time()
    document_store.col.insert(insert_list)
    t3 = time.time()
    print(f"Time to manually write {repeat_num} documents: {t3 - t2} seconds")
    document_store.col.flush()
    t4 = time.time()
    print(f"Time to flush: {t4 - t3} seconds")

    # retriever = MilvusEmbeddingRetriever(
    #     document_store,
    # )
    # query_embedding = [-10.0] * 128
    # res = retriever.run(query_embedding)

if __name__ == '__main__':
    run(document_store)