apache / kvrocks

Apache Kvrocks is a distributed key value NoSQL database that uses RocksDB as storage engine and is compatible with Redis protocol.
https://kvrocks.apache.org/
Apache License 2.0
3.59k stars 469 forks source link

Benchmark for vector indexing and searching #2481

Open PragmaTwice opened 3 months ago

PragmaTwice commented 3 months ago

Search before asking

Motivation

We can try to use this repo: https://github.com/qdrant/vector-db-benchmark

After some simple patching:

diff --git a/engine/clients/redis/configure.py b/engine/clients/redis/configure.py
index a5e6fe8..b2b8af0 100644
--- a/engine/clients/redis/configure.py
+++ b/engine/clients/redis/configure.py
@@ -7,6 +7,7 @@ from redis.commands.search.field import (
     TextField,
     VectorField,
 )
+from redis.commands.search.indexDefinition import IndexDefinition, IndexType

 from benchmark.dataset import Dataset
 from engine.base_client.configure import BaseConfigurator
@@ -81,7 +82,7 @@ class RedisConfigurator(BaseConfigurator):
                 name="vector",
                 algorithm="HNSW",
                 attributes={
-                    "TYPE": "FLOAT32",
+                    "TYPE": "FLOAT64",
                     "DIM": dataset.config.vector_size,
                     "DISTANCE_METRIC": self.DISTANCE_MAPPING[dataset.config.distance],
                     **self.collection_params.get("hnsw_config", {}),
@@ -97,8 +98,13 @@ class RedisConfigurator(BaseConfigurator):
             ]
         for conn in conns:
             search_namespace = conn.ft()
+            idef = IndexDefinition(
+                prefix=[''],
+                index_type=IndexType.HASH
+            )
+            idef.args = idef.args[:-2]
             try:
-                search_namespace.create_index(fields=index_fields)
+                search_namespace.create_index(fields=index_fields, definition=idef)
             except redis.ResponseError as e:
                 if "Index already exists" not in str(e):
                     raise e
diff --git a/engine/clients/redis/upload.py b/engine/clients/redis/upload.py
index cd4b888..0e29533 100644
--- a/engine/clients/redis/upload.py
+++ b/engine/clients/redis/upload.py
@@ -57,7 +57,7 @@ class RedisUploader(BaseUploader):
             cls.client.hset(
                 str(idx),
                 mapping={
-                    "vector": np.array(vec).astype(np.float32).tobytes(),
+                    "vector": np.array(vec).astype(np.float64).tobytes(),
                     **payload,
                     **geopoints,
                 },

We can start a kvrocks instance and run this benchmark:

python3 run.py --engines redis-default --datasets 'glove-25-angular'

cc @Beihao-Zhou

Solution

Currently we'll get some coredumps in the vector indexing phase, e.g.:

E20240809 14:55:31.865105   134 signal_util.h:36] Stack trace (most recent call first):
#0  0x0000555f0502cac9 in SegvHandler at /usr/bin/kvrocks
#1  0x00007f2e599c004f in __sigaction at /lib/x86_64-linux-gnu/libc.so.6
#2  0x00007f2e59a0ee2c in pthread_key_delete at /lib/x86_64-linux-gnu/libc.so.6
#3  0x00007f2e599bffb1 in gsignal at /lib/x86_64-linux-gnu/libc.so.6
#4  0x00007f2e599aa471 in abort at /lib/x86_64-linux-gnu/libc.so.6
#5  0x0000555f04f77917 in __gnu_cxx::__verbose_terminate_handler() [clone .cold] at vterminate.o
#6  0x0000555f05b16c09 in __cxxabiv1::__terminate(void (*)()) at /usr/bin/kvrocks
#7  0x0000555f05b16c74 in std::terminate() at /usr/bin/kvrocks
#8  0x0000555f05b16dc7 in __cxa_throw at /usr/bin/kvrocks
#9  0x0000555f04f7908d in std::__throw_bad_alloc() at /usr/bin/kvrocks
#10 0x0000555f04f77509 in handleOOM(unsigned long, bool) [clone .cold] at /kvrocks/build/_deps/jemalloc-src/src/jemalloc_cpp.cpp:90
#11 0x0000555f051d6e4a in redis::VectorItem::VectorItem(redis::VectorItem const&) at /usr/bin/kvrocks
#12 0x0000555f051e561a in redis::IndexUpdater::UpdateHnswVectorIndex(std::basic_string_view<char, std::char_traits<char> >, kqir::Value const&, kqir::Value const&, redis::SearchKey const&, redis::HnswVectorFieldMetadata*) const at /usr/bin/kvrocks
#13 0x0000555f051ec7be in redis::IndexUpdater::Update(std::map<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, kqir::Value, std::less<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, kqir::Value> > > const&, std::basic_string_view<char, std::char_traits<char> >) const at /usr/bin/kvrocks
#14 0x0000555f05228549 in redis::Connection::ExecuteCommands(std::deque<std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >, std::allocator<std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > > >*) at
 /usr/bin/kvrocks
#15 0x0000555f0505db30 in EvbufCallbackBase<redis::Connection, true, true, true>::readCB(bufferevent*, void*) at /usr/bin/kvrocks
#16 0x0000555f05978a0d in bufferevent_run_deferred_callbacks_unlocked at bufferevent.c
#17 0x0000555f059808a3 in event_process_active_single_queue at event.c
#18 0x0000555f05980f3e in event_process_active at event.c
#19 0x0000555f05982cf8 in event_base_loop.constprop.0 at event.c
#20 0x0000555f0523d46d in std::thread::_State_impl<std::thread::_Invoker<std::tuple<util::CreateThread<WorkerThread::Start()::{lambda()#1}>(char const*, WorkerThread::Start()::{lambda()#1})::{lambda()#1}> > >::_M_run() at /usr/bin/kvrocks
#21 0x0000555f05b96312 in execute_native_thread_routine at thread.o
#22 0x00007f2e59a0d133 in pthread_condattr_setpshared at /lib/x86_64-linux-gnu/libc.so.6
#23 0x00007f2e59a8ca3f in __clone at /lib/x86_64-linux-gnu/libc.so.6

Maybe we can try to solve it first.

Are you willing to submit a PR?

Beihao-Zhou commented 3 months ago

Thanks for helping benchmark it! I'll fix the bug ASAP

Beihao-Zhou commented 3 months ago

Hiii @PragmaTwice,

I have some updates on the issue. <3

I tried debugging using gdb with the following steps:

➜  kvrocks git:(unstable) ✗ gdb ./build/kvrocks
(gdb) run -c kvrocks.conf

After running python3 run.py --engines redis-default --datasets 'glove-25-angular', I reproduced the error and obtained the following backtrace:

(gdb) bt
[......]
#9  0x000055555561893a in std::__throw_bad_alloc() ()
#10 0x000055555651e6be in handleOOM (size=14411505886035969, nothrow=<optimized out>)
    at /home/ubuntu/kvrocks/build/_deps/jemalloc-src/src/jemalloc_cpp.cpp:90
#11 0x00005555556c4fb4 in std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >::_M_construct<char*> (
    this=0x7ffff19f28a0, __beg=0x33010000000200 <error: Cannot access memory at address 0x33010000000200>, 
    __end=0x66343031040200 <error: Cannot access memory at address 0x66343031040200>)
    at /usr/include/c++/11/bits/basic_string.tcc:219
#12 0x000055555568925a in redis::VectorItem::VectorItem (this=<optimized out>, this=<optimized out>)
    at /home/ubuntu/kvrocks/src/search/hnsw_indexer.h:57
#13 0x00005555559bb5d3 in operator() (__closure=0x7ffff19f25f0, candidate_neighbours=..., selected_neighbours=...)
    at /home/ubuntu/kvrocks/src/search/hnsw_indexer.cc:432
#14 0x00005555559bc4c3 in redis::HnswIndex::InsertVectorEntryInternal (this=0x7ffff19f2a90, key=..., vector=..., batch=..., 
    target_level=0) at /home/ubuntu/kvrocks/src/search/hnsw_indexer.cc:436
#15 0x00005555559bd6f9 in redis::HnswIndex::InsertVectorEntry (this=0x7ffff19f2a90, key=..., vector=..., batch=...)
    at /home/ubuntu/kvrocks/src/search/hnsw_indexer.cc:498
#16 0x00005555559d3e7a in redis::IndexUpdater::UpdateHnswVectorIndex (this=0x7ffff402f000, key=..., original=..., current=..., 
    search_key=..., vector=0x7ffff5c08380) at /home/ubuntu/kvrocks/src/search/indexer.cc:287
#17 0x00005555559d4633 in redis::IndexUpdater::UpdateIndex (this=0x7ffff402f000, field=..., key=..., original=..., current=...)
    at /home/ubuntu/kvrocks/src/search/indexer.cc:314
[......]

From line 13, we can see that there is a closure attempting to access a VectorItem but failing, causing a memory leak. It turns out that it is trying to dereference a null iterator. The patch at the end will temporarily fix the crash.

However, logically, there should be no situation where deleted_node_it gets a null iterator. I suspect this issue is due to concurrent requests for index updating, where individual updates access inconsistent snapshots and mess up the node metadata. Currently, HnswIndex doesn't handle concurrency issues.

# Kvrocks log 
I20240812 00:40:37.942303 185593 worker.cc:111] [worker] New connection: fd=56 from port: 6666 thread #140737224099392
I20240812 00:40:37.997772 185593 worker.cc:111] [worker] New connection: fd=57 from port: 6666 thread #140737224099392
...
...
I20240812 00:40:39.056308 185592 worker.cc:111] [worker] New connection: fd=71 from port: 6666 thread #140737232492096

Given this, I guess addressing the consistency issue will probably in turn resolve the issue here. I saw there is a PR #2310 to solve this? And should we solve this issue after the PR is finalized and merged? Let me know if you believe there are other possibilities/solutions we should consider, as I'm not entirely certain this is the underlying cause. Thanks!

Patch

diff --git a/src/search/hnsw_indexer.cc b/src/search/hnsw_indexer.cc
index 3618ad8..e8271b8 100644
--- a/src/search/hnsw_indexer.cc
+++ b/src/search/hnsw_indexer.cc
@@ -385,7 +385,7 @@ Status HnswIndex::InsertVectorEntryInternal(std::string_view key, const kqir::Nu

       // Check if candidate node has room after some other nodes' are pruned in current batch
       auto has_room_after_deletions = [&](const HnswNode& candidate_node, uint16_t candidate_node_num_neighbours) {
-        auto it = deleted_edges_map.find(candidate_node.key);
+        const auto& it = deleted_edges_map.find(candidate_node.key);
         if (it != deleted_edges_map.end()) {
           auto num_deleted_edges = static_cast<uint16_t>(it->second.size());
           return (candidate_node_num_neighbours - num_deleted_edges) < m_max;
@@ -417,27 +417,30 @@ Status HnswIndex::InsertVectorEntryInternal(std::string_view key, const kqir::Nu
             std::find(sorted_neighbours_by_distance.begin(), sorted_neighbours_by_distance.end(),
                       inserted_vector_item) != sorted_neighbours_by_distance.end();

-        if (inserted_node_is_selected) {
-          // Add the edge between candidate and inserted node
-          GET_OR_RET(AddEdge(inserted_vector_item.key, candidate_node.key, level, batch));
-          connected_edges_set.insert(candidate_node.key);
+        if (!inserted_node_is_selected) {
+          continue;
+        }

-          auto find_deleted_item = [&](const std::vector<VectorItem>& candidate_neighbours,
-                                       const std::vector<VectorItem>& selected_neighbours) -> VectorItem {
-            auto it =
-                std::find_if(candidate_neighbours.begin(), candidate_neighbours.end(), [&](const VectorItem& item) {
-                  return std::find(selected_neighbours.begin(), selected_neighbours.end(), item) ==
-                         selected_neighbours.end();
-                });
-            return *it;
-          };
-
-          // Remove the edge for candidate and the pruned node
-          auto deleted_node = find_deleted_item(candidate_node_neighbour_vec_items, sorted_neighbours_by_distance);
-          GET_OR_RET(RemoveEdge(deleted_node.key, candidate_node.key, level, batch));
-          deleted_edges_map[candidate_node.key].insert(deleted_node.key);
-          deleted_edges_map[deleted_node.key].insert(candidate_node.key);
+        auto deleted_node_it =
+            std::find_if(candidate_node_neighbour_vec_items.begin(), candidate_node_neighbour_vec_items.end(),
+                         [&](const VectorItem& item) {
+                           return std::find(sorted_neighbours_by_distance.begin(), sorted_neighbours_by_distance.end(),
+                                            item) == sorted_neighbours_by_distance.end();
+                         });
+
+        if (deleted_node_it == candidate_node_neighbour_vec_items.end()) {
+          continue;
         }
+        auto deleted_node = *deleted_node_it;
+
+        // Remove the edge for candidate and the pruned node
+        GET_OR_RET(RemoveEdge(deleted_node.key, candidate_node.key, level, batch));
+        deleted_edges_map[candidate_node.key].insert(deleted_node.key);
+        deleted_edges_map[deleted_node.key].insert(candidate_node.key);
+
+        // Add the edge between candidate and inserted node
+        GET_OR_RET(AddEdge(inserted_vector_item.key, candidate_node.key, level, batch));
+        connected_edges_set.insert(candidate_node.key);
       }
+        auto deleted_node = *deleted_node_it;
+
+        // Remove the edge for candidate and the pruned node
+        GET_OR_RET(RemoveEdge(deleted_node.key, candidate_node.key, level, batch));
+        deleted_edges_map[candidate_node.key].insert(deleted_node.key);
+        deleted_edges_map[deleted_node.key].insert(candidate_node.key);
+
+        // Add the edge between candidate and inserted node
+        GET_OR_RET(AddEdge(inserted_vector_item.key, candidate_node.key, level, batch));
+        connected_edges_set.insert(candidate_node.key);
       }

       // Update inserted node metadata
PragmaTwice commented 3 months ago

Thank you for looking into this!

I agree that it's likely related to rocksdb concurrency and transaction management. However, besides ensuring the consistency of rocksdb snapshots, do we need to consider anything else when constructing HNSW indexes in multiple threads? Should we also introduce some locks?

Beihao-Zhou commented 3 months ago

Thank you for looking into this!

I agree that it's likely related to rocksdb concurrency and transaction management. However, besides ensuring the consistency of rocksdb snapshots, do we need to consider anything else when constructing HNSW indexes in multiple threads? Should we also introduce some locks?

Yea introducing locks sounds good to me. Also I think this is probably a common issue for all index rather than only HNSW (including text index in the future if also using quite complicated algorithm?), so we probably want to have a lock mechanism applicable for all. I'll look into it these two days, and open a new issue to discuss my idea & ask for suggestions if needed