milvus-io / milvus

A cloud-native vector database, storage for next generation AI applications
https://milvus.io
Apache License 2.0
30.05k stars 2.88k forks source link

[Bug]: the distances to a query point's 5nn are incorrect #24931

Closed aadharna closed 1 year ago

aadharna commented 1 year ago

Is there an existing issue for this?

Environment

- Milvus version: v2.2.9-lite
- Deployment mode(standalone or cluster): standalone
- MQ type(rocksmq, pulsar or kafka): ??
- SDK version(e.g. pymilvus v2.0.0rc2): pymilvus
- OS(Ubuntu or CentOS): windows # I can run this in WSL as well if that'll be helpful
- CPU/Memory: amd 3950x
- GPU: 3080
- Others: ?

Current Behavior

When using

index_params = {
    "index_type": "FLAT",
    "metric_type": "L2",
    # "params": {"nlist": 128},
}

the distances returned by milvus for the query point's 5-nn are incorrect.

Expected Behavior

When I search a provided collection of 4-dimensional vectors, I should get the following numbers back as the distance to a specific query point's 5-nn: [0.0, 5.48, 5.92, 6.08, 6.24]

Milvus is returning: [0.0, 30.0, 35.0, 37.0, 39.0]

Steps To Reproduce

Here's the scratch file I threw together: 

import os
import pickle
from milvus import default_server
from pymilvus import connections, utility
import numpy as np

COLLECTION_NAME = "monies"
DIMENSION = 4
BATCH_SIZE = 128
TOPK = 5

default_server.start()
connections.connect(host="127.0.0.1", port=default_server.listen_port)

utility.get_server_version()

if utility.has_collection(COLLECTION_NAME):
    utility.drop_collection(COLLECTION_NAME)

from pymilvus import FieldSchema, CollectionSchema, DataType, Collection

# object should be inserted in the format of (title, date, location, speech embedding)
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIMENSION)
]
schema = CollectionSchema(fields=fields)
collection = Collection(name=COLLECTION_NAME, schema=schema)

index_params = {
    "index_type": "FLAT",
    "metric_type": "L2",
    # "params": {"nlist": 128},
}
collection.create_index(field_name="embedding", index_params=index_params)
collection.load()

# I'll attach a pointer to this file
data_loc = os.path.join('wandb', 'run-20230614_205846-4y15deoj', 'files', 'archive.pkl')

archives = pickle.load(open(data_loc, 'rb'))
novelty_vector_archive = archives['archive'].astype('float32')

data_batch = [[]]

# add to database
for i, novelty_vector in enumerate(novelty_vector_archive):
    data_batch[0].append(novelty_vector)
    if len(data_batch[0]) % BATCH_SIZE == 0 or (i == len(novelty_vector_archive) - 1):
        collection.insert(data_batch)
        data_batch = [[]]

collection.flush()

# search for similar vectors

query_embedding = [[20., 9., 25., 37.]]  # this is a point in the novelty vector space

search_params = {"metric_type": "L2"}

results = collection.search(
    data=query_embedding, anns_field="embedding", param=search_params, limit=TOPK
)

dists = np.linalg.norm(novelty_vector_archive - query_embedding[0], axis=1)
sorted_dists = np.argsort(dists)
reward = np.mean(dists[sorted_dists][:5])

print(dists[sorted_dists][:5].round(2).tolist())
print(results[0].distances)

default_server.stop()


### Milvus Log

_No response_

### Anything else?

Here's a link to the pkl file uploaded into my google-drive: https://drive.google.com/file/d/1aC22dCxjTLYjyMuTRhgbbeZeDGF-6iK1/view?usp=sharing
xiaofan-luan commented 1 year ago

6.24

Hi @aadharna, this is the metrics details https://milvus.io/docs/metric.md#Similarity-Metrics for milvus we don't do the square root for performance reason, it doesn't change the actual order. You can always do it yourself if you want exact result

aadharna commented 1 year ago

Oh wow. How did I not notice that?! Perfect, I can square root that myself!

Thanks for such a prompt response.

aadharna commented 1 year ago

Quick follow-up question while I've got you here.

So, I am working in a reinforcement learning setting and one of the main tricks we use for parallelization is to run many instances of the learning environment at once (1024).

In my specific case each copy of the environment was previously maintaining/growing a matrix that I would calculate similarity / knn against. This worked well until I needed to start doing really long runs as the knn calculates were really slowing me down after my comparison matrices were 15000 x 4 each (of which there are 1024 envs each with one of these matrices).

Is it possible for me to have multiple processes all point at the same underlying milvus database?

yanliang567 commented 1 year ago

/assign @xiaofan-luan

aadharna commented 1 year ago

For some more context of what we usually do in RL:

Here's an example of making vectorized environments:

    env_config = {'n': 4,
                  'e': 1,
                  'k': 5,
                  'delta': args.novelty_threshold,
                  'append_only_novel': args.only_save_novel,
                  'money_limit': 100,
                  }

    def make_env():
        def thunk():
            env = MoneyDistributionEnv(config_dict=env_config)
            env.single_observation_space = env.observation_space
            env.action_space = env.action_space
            env.single_action_space = env.action_space
            return env

        return thunk

    envs = gym.vector.SyncVectorEnv([make_env() for _ in range(args.num_envs)])

And then here's the actual gym class that gets instantiated many times (with, in this case, each one maintaining its own matrix):

import numpy as np
import gymnasium as gym
from gymnasium import spaces
# money distributed env where the agent gives the other n agents all money at once
class MoneyDistributionEnv(gym.Env):
    def __init__(self, config_dict):
        self.n = config_dict.get('n', 4)
        self.e = config_dict.get('e', 1)
        self.k = config_dict.get('k', 5)
        self.delta = config_dict.get('delta', 1.5)
        self.only_append_novel = config_dict.get('only_append_novel', True)
        self.money_limit = config_dict.get('money_limit', 100)
        self.action_space = spaces.MultiDiscrete([self.money_limit for _ in range(self.n)])
        self.observation_space = spaces.Box(low=0, high=self.money_limit, shape=(self.n,), dtype=float)
        self.previous_money_distributions = np.zeros((1, self.n))
        self.raw_dists = []
        self.ranking = config_dict.get('rank', False)
        self.mn = np.zeros(self.n)
        self.reset()

    def mean(self, an):
        # online mean for each axis of the archive
        n = len(self.previous_money_distributions)
        self.mn = self.mn + (an - self.mn) / n
        return self.mn

    def step(self, action):
        info = {}
        reward, done, cleaned_action = self._get_reward(action)
        add = False
        if self.only_append_novel:
            if reward > self.delta:
                # flag to add this vector to archive
                self.previous_money_distributions = np.vstack((self.previous_money_distributions, action))
        else:
            # add all vectors to archive
            self.previous_money_distributions = np.vstack((self.previous_money_distributions, action))
        # self.raw_dists.append(action)
        # return to the agent the mean of the archive
        # this should let the agent pick something new?
        obs = self.mean(cleaned_action)
        done = {'__all__': done}
        return obs, reward, done, done, info

    def reset(self, **kwargs):
        return self.mn, {}

    def _get_reward(self, obs):
        if self.previous_money_distributions.shape[0] < 2:
            reward = 1
        else:
            current_money_distribution = np.array(obs)
            dists = np.linalg.norm(self.previous_money_distributions - current_money_distribution, axis=1)
            sorted_dists = np.argsort(dists)
            reward = np.mean(dists[sorted_dists][:self.k])
        if np.sum(obs) > self.money_limit:
            reward = -1
            # obs = [0 for _ in range(self.n)]
        return reward, True, obs

So, something like the ideal setup would be for there to still be many environment classes to evaluate many points at once but then each one points to the same underlying database so as to only need to maintain one big matrix of previously novel solutions.

aadharna commented 1 year ago

Additional follow up question, what do you do if the dimension of the embedding vectors grows over time?

e.g., for training steps 1-50 the embedding vector is 4-dimensional, but then after every 50 training steps, the embedding size grows by 1 to account for newly taught agents? [a1, a2, a3, a4] --> [a1, a2, a3, a4, a5]

Currently, I'm storing information in dictionaries where I can just add a new key that's keyed to the newly created agent.

yanliang567 commented 1 year ago

/assign @liliu-z any ideas?

xiaofan-luan commented 1 year ago

当我把你带到这里时,快速跟进问题。

因此,我正在强化学习环境中工作,我们用于并行化的主要技巧之一是同时运行学习环境的多个实例(1024)。

在我的具体情况下,环境的每个副本之前都维护/增长一个矩阵,我将根据该矩阵计算相似度/knn。这种方法效果很好,直到我需要开始进行非常长的运行,因为在我的比较矩阵分别为 15000 x 4(其中每个矩阵有 1024 个 envs)后,knn 计算确实减慢了我的速度。

我是否可以让多个进程都指向同一个底层 milvus 数据库?

Is there an existing issue for this?

  • [x] I have searched the existing issues

Environment

- Milvus version: v2.2.9-lite
- Deployment mode(standalone or cluster): standalone
- MQ type(rocksmq, pulsar or kafka): ??
- SDK version(e.g. pymilvus v2.0.0rc2): pymilvus
- OS(Ubuntu or CentOS): windows # I can run this in WSL as well if that'll be helpful
- CPU/Memory: amd 3950x
- GPU: 3080
- Others: ?

Current Behavior

When using

index_params = {
    "index_type": "FLAT",
    "metric_type": "L2",
    # "params": {"nlist": 128},
}

the distances returned by milvus for the query point's 5-nn are incorrect.

Expected Behavior

When I search a provided collection of 4-dimensional vectors, I should get the following numbers back as the distance to a specific query point's 5-nn: [0.0, 5.48, 5.92, 6.08, 6.24]

Milvus is returning: [0.0, 30.0, 35.0, 37.0, 39.0]

Steps To Reproduce

Here's the scratch file I threw together: 

import os
import pickle
from milvus import default_server
from pymilvus import connections, utility
import numpy as np

COLLECTION_NAME = "monies"
DIMENSION = 4
BATCH_SIZE = 128
TOPK = 5

default_server.start()
connections.connect(host="127.0.0.1", port=default_server.listen_port)

utility.get_server_version()

if utility.has_collection(COLLECTION_NAME):
    utility.drop_collection(COLLECTION_NAME)

from pymilvus import FieldSchema, CollectionSchema, DataType, Collection

# object should be inserted in the format of (title, date, location, speech embedding)
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIMENSION)
]
schema = CollectionSchema(fields=fields)
collection = Collection(name=COLLECTION_NAME, schema=schema)

index_params = {
    "index_type": "FLAT",
    "metric_type": "L2",
    # "params": {"nlist": 128},
}
collection.create_index(field_name="embedding", index_params=index_params)
collection.load()

# I'll attach a pointer to this file
data_loc = os.path.join('wandb', 'run-20230614_205846-4y15deoj', 'files', 'archive.pkl')

archives = pickle.load(open(data_loc, 'rb'))
novelty_vector_archive = archives['archive'].astype('float32')

data_batch = [[]]

# add to database
for i, novelty_vector in enumerate(novelty_vector_archive):
    data_batch[0].append(novelty_vector)
    if len(data_batch[0]) % BATCH_SIZE == 0 or (i == len(novelty_vector_archive) - 1):
        collection.insert(data_batch)
        data_batch = [[]]

collection.flush()

# search for similar vectors

query_embedding = [[20., 9., 25., 37.]]  # this is a point in the novelty vector space

search_params = {"metric_type": "L2"}

results = collection.search(
    data=query_embedding, anns_field="embedding", param=search_params, limit=TOPK
)

dists = np.linalg.norm(novelty_vector_archive - query_embedding[0], axis=1)
sorted_dists = np.argsort(dists)
reward = np.mean(dists[sorted_dists][:5])

print(dists[sorted_dists][:5].round(2).tolist())
print(results[0].distances)

default_server.stop()

### Milvus Log

_No response_

### Anything else?

Here's a link to the pkl file uploaded into my google-drive: https://drive.google.com/file/d/1aC22dCxjTLYjyMuTRhgbbeZeDGF-6iK1/view?usp=sharing

Quick question

So you are saying all clients share the same collection, but after some iterations the vector dimensions add from 4dim to 5 then 6....

  1. how do you sync between different clients, is there a synchronous epoch?
  2. if all clients can synchronously change from 4 dim to 5 dim, maybe you can use collection alias? simply create a new collection with 5 dim and alter the alias

does that help on your case?

yanliang567 commented 1 year ago

/assign @aadharna

aadharna commented 1 year ago

My apologies for the delayed response.

With some optimization, I got a single-process environment to be fast enough that I didn't need to keep and run several access points into the database at once.

And the results held up in my simple domain, so I'm just going to close this and if I end up needing similar speedups in my new domains / the data grows to a point where a database becomes necessary again, I'll reach back out.