milvus-io / milvus

A cloud-native vector database, storage for next generation AI applications
https://milvus.io
Apache License 2.0
29.49k stars 2.82k forks source link

[Bug]: [major compaction] Major compact hangs when enable "usePartitionKeyAsClusteringKey" and "useVectorAsClusteringKey" together with partition key field #32371

Open binbinlv opened 5 months ago

binbinlv commented 5 months ago

Is there an existing issue for this?

Environment

- Milvus version:lru_dev branch latest
- Deployment mode(standalone or cluster):both
- MQ type(rocksmq, pulsar or kafka):   all 
- SDK version(e.g. pymilvus v2.0.0rc2): dev latest
- OS(Ubuntu or CentOS): 
- CPU/Memory: 
- GPU: 
- Others:

Current Behavior

Major compact hangs when enable "usePartitionKeyAsClusteringKey" and "useVectorAsClusteringKey" together with partition key field

Expected Behavior

Major compact successfully

Steps To Reproduce

  1. set "useVectorAsClusteringKey" as true in milvus.yaml
  2. set "usePartitionKeyAsClusteringKey" as true in milvus.yaml
  3. run the following script:
import os
import time
import random
import string
import numpy as np
from pymilvus import (
    connections,
    utility,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

fmt = "\n=== {:30} ===\n"
dim = 128

print(fmt.format("start connecting to Milvus"))
host = os.environ.get('MILVUS_HOST')
if host == None:
    host = ""
print(fmt.format(f"Milvus host: {host}"))
connections.connect()

default_fields = [
    FieldSchema(name="count", dtype=DataType.INT64, is_primary=True),
    FieldSchema(name="key", dtype=DataType.INT64, is_partition_key=True),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="var", dtype=DataType.VARCHAR, max_length=10000),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]
default_schema = CollectionSchema(fields=default_fields, description="test clustering-key collection")
collection_name = "major_compaction_collection_enable_scalar_partition_key_after_index"

if utility.has_collection(collection_name):
   collection = Collection(name=collection_name)
   collection.drop()
   print("drop the original collection")
hello_milvus = Collection(name=collection_name, schema=default_schema)

print("Starting major compaction")
start = time.time()
hello_milvus.compact(is_major=True)
res = hello_milvus.get_compaction_state(is_major=True)
print(res)
print("Waiting for major compaction complete")
hello_milvus.wait_for_compaction_completed(is_major=True)
end = time.time()
print("Major compaction complete in %f s" %(end - start))
res = hello_milvus.get_compaction_state(is_major=True)
print(res)

nb = 1000

rng = np.random.default_rng(seed=19530)
random_data = rng.random(nb).tolist()

vec_data = [[random.random() for _ in range(dim)] for _ in range(nb)]
_len = int(20)
_str = string.ascii_letters + string.digits
_s = _str
print("_str size ", len(_str))

for i in range(int(_len / len(_str))):
    _s += _str
    print("append str ", i)
values = [''.join(random.sample(_s, _len - 1)) for _ in range(nb)]
index = 0
while index < 100:
    # insert data
    data = [
        [index * nb + i for i in range(nb)],
        [random.randint(0,100) for i in range(nb)],
        random_data,
        values,
        vec_data,
    ]
    start = time.time()
    res = hello_milvus.insert(data)
    end = time.time() - start
    print("insert %d %d done in %f" % (index, nb, end))
    index += 1
    hello_milvus.flush()

print(f"Number of entities in Milvus: {hello_milvus.num_entities}")  # check the num_entites

# 4. create index
print(fmt.format("Start Creating index AUTOINDEX"))
index = {
    "index_type": "AUTOINDEX",
    "metric_type": "L2",
    "params": {},
}

print("creating index")
hello_milvus.create_index("embeddings", index)
print("waiting for index completed")
utility.wait_for_index_building_complete(collection_name)
res = utility.index_building_progress(collection_name)
print(res)

print(fmt.format("Load"))
hello_milvus.load()

res = utility.get_query_segment_info(collection_name)

print("before major compaction")
print(res)

# major compact

print("Starting major compaction")
start = time.time()
hello_milvus.compact(is_major=True)
res = hello_milvus.get_compaction_state(is_major=True)
print(res)
print("Waiting for major compaction complete")
hello_milvus.wait_for_compaction_completed(is_major=True)
end = time.time()
print("Major compaction complete in %f s" %(end - start))
res = hello_milvus.get_compaction_state(is_major=True)
print(res)

res = utility.get_query_segment_info(collection_name)
print("after major compaction")
print(res)

nb = 1
vectors = [[random.random() for _ in range(dim)] for _ in range(nb)]

nq = 1

default_search_params = {"metric_type": "L2", "params": {}}
res1 = hello_milvus.search(vectors[:nq], "embeddings", default_search_params, 10, "count >= 0")

print(res1[0].ids)

Milvus Log

collection name: major_compaction_collection_enable_scalar_partition_key_after_index

https://grafana-4am.zilliz.cc/explore?orgId=1&left=%7B%22datasource%22:%22Loki%22,%22queries%22:%5B%7B%22refId%22:%22A%22,%22expr%22:%22%7Bcluster%3D%5C%22devops%5C%22,namespace%3D%5C%22chaos-testing%5C%22,pod%3D~%5C%22major-vec-partition-vabea.*%5C%22%7D%22%7D%5D,%22range%22:%7B%22from%22:%22now-1h%22,%22to%22:%22now%22%7D%7D

Anything else?

No response

binbinlv commented 5 months ago

And the executing plan number is strange:

c.get_compaction_state(is_major=True)

CompactionState
 - compaction id: 449137408157615384
 - State: Executing
 - executing plan number: 86
 - timeout plan number: 0
 - complete plan number: 6

>>>
>>>
>>> c.get_compaction_state(is_major=True)

CompactionState
 - compaction id: 449137408157615384
 - State: Executing
 - executing plan number: 144
 - timeout plan number: 0
 - complete plan number: 16

>>>
>>>
>>> c.get_compaction_state(is_major=True)

CompactionState
 - compaction id: 449137408157615384
 - State: Executing
 - executing plan number: 163
 - timeout plan number: 0
 - complete plan number: 22

>>>
stale[bot] commented 4 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

yanliang567 commented 4 months ago

keep it

stale[bot] commented 2 months ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

yanliang567 commented 2 months ago

any updates for this