While testing on with below details, I was seeing CB exceptions.
dataset with 1536D and 5M records
1 data node with 128GB ram and 16vCPUs
OpenSearch heap size: 32gb, version 2.11.0
Shards: 1
Force merge segments to 1
Logs:
If CB is enabled
024-02-24 03:36:09,149 | INFO: Start optimize (opensearch.py:169) (372349)
2024-02-24 03:42:42,914 | WARNING: VectorDB optimize error: TransportError(429, 'circuit_breaking_exception', '[parent] Data too large, data for [<http_request>] would b
e [32224869544/30gb], which is larger than the limit of [31111669350/28.9gb], real usage: [32224869544/30gb], new bytes reserved: [0/0b], usages [request=0/0b, fielddata
=0/0b, in_flight_requests=378/378b]') (task_runner.py:258) (371437)
2024-02-24 03:42:42,992 | WARNING: Failed to run performance case, reason = TransportError(429, 'circuit_breaking_exception', '[parent] Data too large, data for [<http_r
equest>] would be [32224869544/30gb], which is larger than the limit of [31111669350/28.9gb], real usage: [32224869544/30gb], new bytes reserved: [0/0b], usages [request
=0/0b, fielddata=0/0b, in_flight_requests=378/378b]') (task_runner.py:186) (371437)
If CB is not enabled
WARN ][o.o.m.j.JvmGcMonitorService] [integTest-0] [gc][1433] overhead, spent [1.5s] collecting in the last [1.5s] [49/1803]
» WARN ][o.o.m.j.JvmGcMonitorService] [integTest-0] [gc][1434] overhead, spent [1s] collecting in the last [1s]
» ERROR][o.o.b.OpenSearchUncaughtExceptionHandler] [integTest-0] fatal error in thread [opensearch[integTest-0][refresh][T#1]], exiting
» java.lang.OutOfMemoryError: Java heap space
» at org.opensearch.common.util.concurrent.ThreadContext.propagateTransients(ThreadContext.java:574) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.common.util.concurrent.ThreadContext.stashContext(ThreadContext.java:162) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:847) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
» at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
» at java.lang.Thread.run(Thread.java:829) [?:?]
» ERROR][o.o.b.OpenSearchUncaughtExceptionHandler] [integTest-0] fatal error in thread [opensearch[integTest-0][write][T#13]], exiting
» java.lang.OutOfMemoryError: Java heap space
» at org.apache.lucene.util.ArrayUtil.copyOfSubArray(ArrayUtil.java:613) ~[lucene-core-9.7.0.jar:9.7.0 ccf4b198ec328095d45d2746189dc8ca633e8bcf - 2023-06-21 11:48:16]
» at org.apache.lucene.util.BytesRef.deepCopyOf(BytesRef.java:175) ~[lucene-core-9.7.0.jar:9.7.0 ccf4b198ec328095d45d2746189dc8ca633e8bcf - 2023-06-21 11:48:16]
» at org.opensearch.core.common.bytes.BytesArray.<init>(BytesArray.java:64) ~[opensearch-core-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.index.translog.TranslogWriter.assertNoSeqNumberConflict(TranslogWriter.java:345) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.index.translog.TranslogWriter.add(TranslogWriter.java:286) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.index.translog.Translog.add(Translog.java:571) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.index.translog.InternalTranslogManager.add(InternalTranslogManager.java:327) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.index.engine.InternalEngine.index(InternalEngine.java:1025) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.index.shard.IndexShard.index(IndexShard.java:1123) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:1068) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.index.shard.IndexShard.applyIndexOperationOnPrimary(IndexShard.java:959) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:619) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.action.bulk.TransportShardBulkAction$2.doRun(TransportShardBulkAction.java:466) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.action.bulk.TransportShardBulkAction.performOnPrimary(TransportShardBulkAction.java:530) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnPrimary(TransportShardBulkAction.java:411) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnPrimary(TransportShardBulkAction.java:123) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.action.support.replication.TransportWriteAction$1.doRun(TransportWriteAction.java:223) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:908) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
» at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
» at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
» at java.lang.Thread.run(Thread.java:829) [?:?]
» WARN ][o.o.k.i.c.K.KNN80DocValuesConsumer] [integTest-0] Refresh operation complete in 31371 ms
» ↓ last 40 non error or warning messages from /workplace/workspace/k-NN/build/testclusters/integTest-0/logs/opensearch.stdout.log ↓
» [2024-03-01T08:51:09,072][INFO ][o.o.m.j.JvmGcMonitorService] [integTest-0] [gc][1366] overhead, spent [313ms] collecting in the last [1s]
» [2024-03-01T08:51:52,157][INFO ][o.o.m.j.JvmGcMonitorService] [integTest-0] [gc][1409] overhead, spent [457ms] collecting in the last [1s]
» [2024-03-01T08:52:22.500264Z] [BUILD] Stopping node
The solution I am proposing here is while reading the vectors from doc values we will stream/transfer the vectors to a memory address in Native memory(RAM) and then pass that address to JNI layer while creating indices for native libraries( Faiss and Nmslib), rather than accumulating the vectors in a list in heap and then pass this to JNI layer. I have done a POC implementation for this same here. This will help resolve the issue described at the start reason being we are just keeping a finite amount of vectors in heap hence no OOM or CB will happen.
Critical Design Choices
How many vectors we should be streaming at once from Java to JNI Layer?
This is an interesting choice to take, as we don’t want to stream a lot of vectors to JNI layer at once because it can lead to GC getting triggered when Heap Memory is under stress, we also don’t want to steam too less which can lead to this context switch and more fragmentation in Native memory.
Approach 1
So what I am proposing here is may be instead of Number of vectors we should focus on amount of data we should be streaming because this is what actually being sent. Considering a typical JVM size as 32GB for production workloads streaming 100Mb which is 0.003% of the whole heap.
Dimensions
Stream size in Mb
Number of Vectors that can be streamed
Number of Vectors in Segment
Number of Trips required to send data to JNI
Approximate Segment Size with graphs + doc values(in GB)
Total Number of floats
Total Size of floats (in GB)
128
100
204800
5000000
25
5.66244
640,000,000
2
256
100
102400
5000000
49
10.66923
1,280,000,000
5
512
100
51200
5000000
98
20.68281
2,560,000,000
10
768
100
34133
5000000
147
30.69639
3,840,000,000
14
1024
100
25600
5000000
196
40.70997
5,120,000,000
19
1536
100
17066
5000000
293
60.73713
7,680,000,000
29
Table 1: Providing details around segment size and vectors
Considering the above table we can see that number of trips to JNI will be increased as the dimension increase if we keep a constant data that can be sent to JNI.
The concern here is not the number of trips we are making to JNI, problem is every time we go to JNI we will be adding the floats in c++ stl vectors. If there is not enough memory to expand the vector in place then c++ will copy this whole vector to new memory location and then all data to it(ref). This will add latency in the overall system. Check below benchmarks which shows that if you send all data at once and if you send in batch how much extra latency gets added. So what I am proposing here is may be instead of Number of vectors we should focus on amount of data we should be streaming because this is what actually being sent. Considering a typical JVM size as 32GB for production workloads streaming 100Mb which is 0.003% of the whole heap.
Benchmark
Total Number of Vectors to Transfer
Dimension
VectorsPerTransfer
Mode (Single Shot)
Cnt
Score
Units
Heap Used (in Mb)
Number of trips to JNI Layer
TransferVectorsBenchmarks.transferVectors
1M
128
100000
ss
3
1.917
s/op
48.82813
10
TransferVectorsBenchmarks.transferVectors
1M
128
500000
ss
3
1.596
s/op
244.14063
2
TransferVectorsBenchmarks.transferVectors
1M
128
1000000
ss
3
1.443
s/op
488.28125
1
TransferVectorsBenchmarks.transferVectors
1M
256
100000
ss
3
3.89
s/op
97.65625
10
TransferVectorsBenchmarks.transferVectors
1M
256
500000
ss
3
3.143
s/op
488.28125
2
TransferVectorsBenchmarks.transferVectors
1M
256
1000000
ss
3
2.814
s/op
976.5625
1
TransferVectorsBenchmarks.transferVectors
1M
384
100000
ss
3
6.446
s/op
146.48438
10
TransferVectorsBenchmarks.transferVectors
1M
384
500000
ss
3
5.119
s/op
732.42188
2
TransferVectorsBenchmarks.transferVectors
1M
384
1000000
ss
3
4.568
s/op
1464.84375
1
TransferVectorsBenchmarks.transferVectors
1M
512
100000
ss
3
7.752
s/op
195.3125
10
TransferVectorsBenchmarks.transferVectors
1M
512
500000
ss
3
6.073
s/op
976.5625
2
TransferVectorsBenchmarks.transferVectors
1M
512
1000000
ss
3
5.297
s/op
1953.125
1
TransferVectorsBenchmarks.transferVectors
1M
768
100000
ss
3
12.577
s/op
292.96875
10
TransferVectorsBenchmarks.transferVectors
1M
768
500000
ss
3
9.723
s/op
1464.84375
2
TransferVectorsBenchmarks.transferVectors
1M
768
1000000
ss
3
8.663
s/op
2929.6875
1
TransferVectorsBenchmarks.transferVectors
1M
1024
100000
ss
3
15.406
s/op
390.625
10
TransferVectorsBenchmarks.transferVectors
1M
1024
500000
ss
3
11.838
s/op
1953.125
2
TransferVectorsBenchmarks.transferVectors
1M
1024
1000000
ss
3
10.306
s/op
3906.25
1
TransferVectorsBenchmarks.transferVectors
1M
1536
100000
ss
3
24.882
s/op
585.9375
10
TransferVectorsBenchmarks.transferVectors
1M
1536
500000
ss
3
19.091
s/op
2929.6875
2
TransferVectorsBenchmarks.transferVectors
1M
1536
1000000
ss
3
16.978
s/op
5859.375
1
Table 2: Benchmarking results when a fixed number of vectors are transferred without initial capacity being set.
Approach 2 (Recommended)
To ensure that we are not adding any extra latency that is coming due to sizing and re-sizing of the stl::vector we can set an initial capacity for the vector. If we do so then there is no re-sizing happening and we can avoid the extra latency. Below benchmarks run the same experiment except now the stl::vector expansion is not happening.
Benchmark | Total Number of Vectors to Transfer | Dimension | Vectors Per Transfer | Mode (Single Shot) | Cnt | Score | Units | Heap Used (in Mb) | Number of trips to JNI Layer
-- | -- | -- | -- | -- | -- | -- | -- | -- | --
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 128 | 100000 | ss | 3 | 0.688 | s/op | 48.82813 | 10
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 128 | 500000 | ss | 3 | 0.693 | s/op | 244.14063 | 2
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 128 | 1000000 | ss | 3 | 0.705 | s/op | 488.28125 | 1
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 256 | 100000 | ss | 3 | 1.195 | s/op | 97.65625 | 10
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 256 | 500000 | ss | 3 | 1.194 | s/op | 488.28125 | 2
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 256 | 1000000 | ss | 3 | 1.198 | s/op | 976.5625 | 1
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 384 | 100000 | ss | 3 | 1.727 | s/op | 146.48438 | 10
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 384 | 500000 | ss | 3 | 1.721 | s/op | 732.42188 | 2
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 384 | 1000000 | ss | 3 | 1.747 | s/op | 1464.84375 | 1
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 512 | 100000 | ss | 3 | 2.355 | s/op | 195.3125 | 10
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 512 | 500000 | ss | 3 | 2.336 | s/op | 976.5625 | 2
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 512 | 1000000 | ss | 3 | 2.364 | s/op | 1953.125 | 1
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 960 | 100000 | ss | 3 | 4.142 | s/op | 366.21094 | 10
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 960 | 500000 | ss | 3 | 4.154 | s/op | 1831.05469 | 2
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 960 | 1000000 | ss | 3 | 4.162 | s/op | 3662.10938 | 1
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1024 | 100000 | ss | 3 | 4.354 | s/op | 390.625 | 10
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1024 | 500000 | ss | 3 | 4.357 | s/op | 1953.125 | 2
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1024 | 1000000 | ss | 3 | 4.405 | s/op | 3906.25 | 1
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1536 | 100000 | ss | 3 | 6.423 | s/op | 585.9375 | 10
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1536 | 500000 | ss | 3 | 6.434 | s/op | 2929.6875 | 2
TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1536 | 1000000 | ss | 3 | 6.457 | s/op | 5859.375 | 1
Table 3: Benchmarking results when a fixed number of vectors are transferred with initial capacity being set.
As we can see by removing the expansion out of picture and applying few more optimization which is only possible by setting the initial capacity(directly converting java float array to vectors rather than an intermediate storage), we are able to reduce the time taken to move vectors from Java to JNI layer > 50%.
#### How do we find out accurately the number of vectors which we want to stream to JNI layer without reading all the vectors in the segment?
On examining closely the DocValues interface provides a function called as cost() which returns the max documents present. But this includes the deleted documents too.
The segment creation/graph creation happens in 2 scenarios:
1. When the OS refresh happens
2. When merge is happening for segments(automated/force merge)
For #1, the number of vectors won’t will be that high which can cause the OOM issues, hence we can stream all the vectors to JNI layer directly and we don’t need to depend on cost() to determine the size of vectors upfront.
For #2, as the merges happen for large segments and there will be deleted docs then cost() function cannot be used, as this will lead to creating large chunk of memory which we will not use for graph creation. To accurately find the number of docs in BinaryDocValues we can use the LiveDoc bits. Please refer [this](https://github.com/navneet1v/k-NN/commit/e3b890c7efc215d5a6aa34629c521cf705deff72) POC code ref. Sample sudo code below:
```
class KNN80DocValuesReader {
private MergeState mergeState;
public BinaryDocValues getBinary(FieldInfo field) {
// iterate over all the docValues producers present in the segments getting merged
for (i in mergeState.docValuesProducers.length) {
BinaryDocValues values = mergeState.docValuesProducers[i].getBinary(field);
Bits liveDocs = this.mergeState.liveDocs[i];
// check if liveDocs is not null, indicating presence of deleted docs
if (liveDocs != null) {
log.info("There are some deleted docs present");
// so we counted all the live docs here
int docId;
for(docId = values.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId =
values.nextDoc()) {
if (liveDocs.get(docId)) {
liveDocsCount++;
}
}
} else {
// no live docs are present so lets use all the docs.
liveDocsCount += values.cost();
}
}
// finally set live docs and cost in and use them later while streaming the vectors
set liveDocs and cost in KNN80BinaryDocValues
return KNN80BinaryDocValues;
}
}
```
So, Approach 2 and setting the initial capacity for the stl::vector we can remove the decision of selecting the accurate size of vectors to be transferred and become more memory oriented transfers. Below are the benchmarks that shows with different memory size like 100MB, 200MB .. 500MB what will be the impact on latency. We can start with a default value of 1% of heap size by setting this in cluster setting. This will provide user flexibility to change it in future.
The benefit of using a memory based limit on the transfer of vectors over fixed number based limit is even when number of shards increase on the node, the system will be stable. Example if we decide to use lets say 100K as a limit then having 50 shards on the node can lead to heap CBs as total heap required will ~29GB((586*50)/1024).
dimension | vectorsPerTransfer | Heap Used (in Mb)
-- | -- | --
128 | 100000 | 48.82813
128 | 500000 | 244.14063
128 | 1000000 | 488.28125
256 | 100000 | 97.65625
256 | 500000 | 488.28125
256 | 1000000 | 976.5625
384 | 100000 | 146.48438
384 | 500000 | 732.42188
384 | 1000000 | 1464.84375
512 | 100000 | 195.3125
512 | 500000 | 976.5625
512 | 1000000 | 1953.125
960 | 100000 | 366.21094
960 | 500000 | 1831.05469
960 | 1000000 | 3662.10938
1024 | 100000 | 390.625
1024 | 500000 | 1953.125
1024 | 1000000 | 3906.25
1536 | 100000 | 585.9375
1536 | 500000 | 2929.6875
1536 | 1000000 | 5859.375
Table 4: Shows the details on how heap usage changes based dimension for a fixed number of vectors.
If we compare table 5 with table 3, we can see that the difference in the latencies are very minimal < 100ms. This provides a strong evidence that we should use a fixed amount of memory for data transfer which is more flexible and robust as compared to transferring a fixed amount of vectors.
Apart from vectors should we also stream the docIds?
I think we should not stream docids. Here are some stats, In Lucene we can have at 2^(31)-1 docIds and an Integer takes 4 bytes so total required memory to hold all docids is 0.5GB((2^(31)-1)/2^30). We do this we will doing over engineering in our solution.
In native memory should we store vectors as 1-D array or create a 2-D array to store the vectors?
If we look at create index interfaces of Faiss and Nmslib we can see that both of them takes a vectors in 1D array. Hence it make sense to use a 1-D array otherwise we will need to construct the 1-D array from 2D array which will consume computations for a larger datasets.
Test Plan
Below are the list of tests/Benchmarks that will performed apart from Unit test and Integration tests.
Correctness Testing
The below tests will ensure that we are able to merge the large dataset even when heap size is less that what Opensearch can accommodate. The below configurations errors out on Opensearch version 2.13. The reason for choosing these configurations because we have seen errors in these configurations recently.
Data set | Dimension | Data set size | Data Nodes Count | Data Node Type | Heap Size in GB | Number of Shards | Replicas | Max Number of Segments | Vectors Size | Engine | Data Type | Algorithm
-- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
Cohere | 768 | 1000000 | 1 | r6g.2xlarge | 2 | 1 | 0 | 1 | 2.86102 | Nmslib | float32 | HNSW
Open AI | 1536 | 5000000 | 1 | r6g.4xlarge | 32 | 1 | 0 | 1 | 28.61023 | Nmslib | float32 | HNSW
SIFT | 128 | 1000000000 | 8 | r6g.12xlarge | 32 | 64 | 1 | 1 | 476.83716 | Nmslib | float32 | HNSW
Cohere | 768 | 1000000 | 1 | r6g.2xlarge | 2 | 1 | 0 | 1 | 2.86102 | Faiss | float32 | HNSW
Open AI | 1536 | 5000000 | 1 | r6g.4xlarge | 32 | 1 | 0 | 1 | 28.61023 | Faiss | float32 | HNSW
SIFT | 128 | 1000000000 | 8 | r6g.12xlarge | 32 | 64 | 1 | 1 | 476.83716 | Faiss | float16 | HNSW
A/B Testing with Opensearch version 2.13
We are going to use nightly benchmarks to validate if there are any regression happening in the system with this change. The main parameters we will be looking for is the indexing time, force merge time and refresh time. Along with that recall should remain intact.
Currently nightly benchmarks doesn’t run training related workloads for those workloads we are going to run them separately. Below are the details.
Data set | Dimension | Data set size | Data Nodes Count | Data Node Type | Heap Size in GB | Number of Shards | Max Number of Segments | Vectors Size | Engine | Data Type | Algorithm
-- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
SIFT | 128 | 1000000 | 3 | r5.4xlarge | 32 | 24 | 1 | 0.47684 | Faiss | float32 | HNSW-PQ
SIFT | 128 | 1000000 | 3 | r5.4xlarge | 32 | 24 | 1 | 0.47684 | Faiss | float32 | IVF
SIFT | 128 | 1000000 | 3 | r5.4xlarge | 32 | 24 | 1 | 0.47684 | Faiss | float32 | IVF-PQ
The experiment results will be compared with results here: https://github.com/opensearch-project/k-NN/issues/1473
## Tasks
- [x] Complete the RFC for the feature
- [X] Add benchmark code to k-NN plugin for all the benchmarks used in the RFC.
- [ ] Raise PR for the feature including UTs and ITs
- [ ] Benchmark with nightly runs to validate no regression
Problem Statement
While testing on with below details, I was seeing CB exceptions.
Logs:
If CB is enabled
If CB is not enabled
Root Cause
The reason why this OOM exception/CB is tripping because while creating the Faiss/nmslib index at a segment level we first load all the vectors(floats) in JVM heap. As vectors are 4byte floats this lead to an array of size ~28.4GB ((4 1536 5000000)/2^30) and then OOM. Ref: https://github.com/opensearch-project/k-NN/blob/main/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java#L42-L59
Solution
The solution I am proposing here is while reading the vectors from doc values we will stream/transfer the vectors to a memory address in Native memory(RAM) and then pass that address to JNI layer while creating indices for native libraries( Faiss and Nmslib), rather than accumulating the vectors in a list in heap and then pass this to JNI layer. I have done a POC implementation for this same here. This will help resolve the issue described at the start reason being we are just keeping a finite amount of vectors in heap hence no OOM or CB will happen.
Critical Design Choices
How many vectors we should be streaming at once from Java to JNI Layer?
This is an interesting choice to take, as we don’t want to stream a lot of vectors to JNI layer at once because it can lead to GC getting triggered when Heap Memory is under stress, we also don’t want to steam too less which can lead to this context switch and more fragmentation in Native memory.
Approach 1
Dimensions
Table 1: Providing details around segment size and vectors
Considering the above table we can see that number of trips to JNI will be increased as the dimension increase if we keep a constant data that can be sent to JNI.
The concern here is not the number of trips we are making to JNI, problem is every time we go to JNI we will be adding the floats in c++ stl vectors. If there is not enough memory to expand the vector in place then c++ will copy this whole vector to new memory location and then all data to it(ref). This will add latency in the overall system. Check below benchmarks which shows that if you send all data at once and if you send in batch how much extra latency gets added.
So what I am proposing here is may be instead of Number of vectors we should focus on amount of data we should be streaming because this is what actually being sent. Considering a typical JVM size as 32GB for production workloads streaming 100Mb which is 0.003% of the whole heap.
Table 2: Benchmarking results when a fixed number of vectors are transferred without initial capacity being set.
Approach 2 (Recommended)
To ensure that we are not adding any extra latency that is coming due to sizing and re-sizing of the stl::vector we can set an initial capacity for the vector. If we do so then there is no re-sizing happening and we can avoid the extra latency. Below benchmarks run the same experiment except now the stl::vector expansion is not happening.Benchmark | Total Number of Vectors to Transfer | Dimension | Vectors Per Transfer | Mode (Single Shot) | Cnt | Score | Units | Heap Used (in Mb) | Number of trips to JNI Layer -- | -- | -- | -- | -- | -- | -- | -- | -- | -- TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 128 | 100000 | ss | 3 | 0.688 | s/op | 48.82813 | 10 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 128 | 500000 | ss | 3 | 0.693 | s/op | 244.14063 | 2 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 128 | 1000000 | ss | 3 | 0.705 | s/op | 488.28125 | 1 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 256 | 100000 | ss | 3 | 1.195 | s/op | 97.65625 | 10 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 256 | 500000 | ss | 3 | 1.194 | s/op | 488.28125 | 2 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 256 | 1000000 | ss | 3 | 1.198 | s/op | 976.5625 | 1 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 384 | 100000 | ss | 3 | 1.727 | s/op | 146.48438 | 10 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 384 | 500000 | ss | 3 | 1.721 | s/op | 732.42188 | 2 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 384 | 1000000 | ss | 3 | 1.747 | s/op | 1464.84375 | 1 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 512 | 100000 | ss | 3 | 2.355 | s/op | 195.3125 | 10 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 512 | 500000 | ss | 3 | 2.336 | s/op | 976.5625 | 2 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 512 | 1000000 | ss | 3 | 2.364 | s/op | 1953.125 | 1 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 960 | 100000 | ss | 3 | 4.142 | s/op | 366.21094 | 10 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 960 | 500000 | ss | 3 | 4.154 | s/op | 1831.05469 | 2 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 960 | 1000000 | ss | 3 | 4.162 | s/op | 3662.10938 | 1 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1024 | 100000 | ss | 3 | 4.354 | s/op | 390.625 | 10 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1024 | 500000 | ss | 3 | 4.357 | s/op | 1953.125 | 2 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1024 | 1000000 | ss | 3 | 4.405 | s/op | 3906.25 | 1 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1536 | 100000 | ss | 3 | 6.423 | s/op | 585.9375 | 10 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1536 | 500000 | ss | 3 | 6.434 | s/op | 2929.6875 | 2 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1536 | 1000000 | ss | 3 | 6.457 | s/op | 5859.375 | 1 Table 3: Benchmarking results when a fixed number of vectors are transferred with initial capacity being set.
As we can see by removing the expansion out of picture and applying few more optimization which is only possible by setting the initial capacity(directly converting java float array to vectors rather than an intermediate storage), we are able to reduce the time taken to move vectors from Java to JNI layer > 50%.
#### How do we find out accurately the number of vectors which we want to stream to JNI layer without reading all the vectors in the segment? On examining closely the DocValues interface provides a function called as cost() which returns the max documents present. But this includes the deleted documents too. The segment creation/graph creation happens in 2 scenarios: 1. When the OS refresh happens 2. When merge is happening for segments(automated/force merge) For #1, the number of vectors won’t will be that high which can cause the OOM issues, hence we can stream all the vectors to JNI layer directly and we don’t need to depend on cost() to determine the size of vectors upfront. For #2, as the merges happen for large segments and there will be deleted docs then cost() function cannot be used, as this will lead to creating large chunk of memory which we will not use for graph creation. To accurately find the number of docs in BinaryDocValues we can use the LiveDoc bits. Please refer [this](https://github.com/navneet1v/k-NN/commit/e3b890c7efc215d5a6aa34629c521cf705deff72) POC code ref. Sample sudo code below: ``` class KNN80DocValuesReader { private MergeState mergeState; public BinaryDocValues getBinary(FieldInfo field) { // iterate over all the docValues producers present in the segments getting merged for (i in mergeState.docValuesProducers.length) { BinaryDocValues values = mergeState.docValuesProducers[i].getBinary(field); Bits liveDocs = this.mergeState.liveDocs[i]; // check if liveDocs is not null, indicating presence of deleted docs if (liveDocs != null) { log.info("There are some deleted docs present"); // so we counted all the live docs here int docId; for(docId = values.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = values.nextDoc()) { if (liveDocs.get(docId)) { liveDocsCount++; } } } else { // no live docs are present so lets use all the docs. liveDocsCount += values.cost(); } } // finally set live docs and cost in and use them later while streaming the vectors set liveDocs and cost in KNN80BinaryDocValues return KNN80BinaryDocValues; } } ``` So, Approach 2 and setting the initial capacity for the stl::vector we can remove the decision of selecting the accurate size of vectors to be transferred and become more memory oriented transfers. Below are the benchmarks that shows with different memory size like 100MB, 200MB .. 500MB what will be the impact on latency. We can start with a default value of 1% of heap size by setting this in cluster setting. This will provide user flexibility to change it in future.
The benefit of using a memory based limit on the transfer of vectors over fixed number based limit is even when number of shards increase on the node, the system will be stable. Example if we decide to use lets say 100K as a limit then having 50 shards on the node can lead to heap CBs as total heap required will ~29GB((586*50)/1024).
dimension | vectorsPerTransfer | Heap Used (in Mb) -- | -- | -- 128 | 100000 | 48.82813 128 | 500000 | 244.14063 128 | 1000000 | 488.28125 256 | 100000 | 97.65625 256 | 500000 | 488.28125 256 | 1000000 | 976.5625 384 | 100000 | 146.48438 384 | 500000 | 732.42188 384 | 1000000 | 1464.84375 512 | 100000 | 195.3125 512 | 500000 | 976.5625 512 | 1000000 | 1953.125 960 | 100000 | 366.21094 960 | 500000 | 1831.05469 960 | 1000000 | 3662.10938 1024 | 100000 | 390.625 1024 | 500000 | 1953.125 1024 | 1000000 | 3906.25 1536 | 100000 | 585.9375 1536 | 500000 | 2929.6875 1536 | 1000000 | 5859.375 Table 4: Shows the details on how heap usage changes based dimension for a fixed number of vectors.
Benchmarks with different size of data transfers
Benchmark | Total Number of Vectors to Transfer | Dimension | SizeOfVectorTransferInMb | Mode (Single Shot) | Cnt | Score | Units | Number of records transferred per trip | Number of Trips to JNI -- | -- | -- | -- | -- | -- | -- | -- | -- | -- TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 128 | 100 | ss | 3 | 0.694 | s/op | 204800 | 5 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 128 | 200 | ss | 3 | 0.685 | s/op | 409600 | 3 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 128 | 500 | ss | 3 | 0.702 | s/op | 1024000 | 1 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 256 | 100 | ss | 3 | 1.19 | s/op | 102400 | 10 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 256 | 200 | ss | 3 | 1.198 | s/op | 204800 | 5 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 256 | 500 | ss | 3 | 1.205 | s/op | 512000 | 2 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 384 | 100 | ss | 3 | 1.721 | s/op | 68266 | 15 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 384 | 200 | ss | 3 | 1.727 | s/op | 136533 | 8 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 384 | 500 | ss | 3 | 1.72 | s/op | 341333 | 3 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 512 | 100 | ss | 3 | 2.358 | s/op | 51200 | 20 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 512 | 200 | ss | 3 | 2.348 | s/op | 102400 | 10 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 512 | 500 | ss | 3 | 2.348 | s/op | 256000 | 4 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 960 | 100 | ss | 3 | 4.16 | s/op | 27306 | 37 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 960 | 200 | ss | 3 | 4.15 | s/op | 54613 | 19 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 960 | 500 | ss | 3 | 4.166 | s/op | 136533 | 8 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1024 | 100 | ss | 3 | 4.344 | s/op | 25600 | 40 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1024 | 200 | ss | 3 | 4.342 | s/op | 51200 | 20 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1024 | 500 | ss | 3 | 4.358 | s/op | 128000 | 8 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1536 | 100 | ss | 3 | 6.407 | s/op | 17066 | 59 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1536 | 200 | ss | 3 | 6.408 | s/op | 34133 | 30 TransferVectorsBenchmarks.transferVectors_withCapacity | 1M | 1536 | 500 | ss | 3 | 6.31 | s/op | 85333 | 12 Table 5: Benchmarking results when fixed memory of vectors are transferred with initial capacity being set.If we compare table 5 with table 3, we can see that the difference in the latencies are very minimal < 100ms. This provides a strong evidence that we should use a fixed amount of memory for data transfer which is more flexible and robust as compared to transferring a fixed amount of vectors.
Apart from vectors should we also stream the docIds?
I think we should not stream docids. Here are some stats, In Lucene we can have at 2^(31)-1 docIds and an Integer takes 4 bytes so total required memory to hold all docids is 0.5GB((2^(31)-1)/2^30). We do this we will doing over engineering in our solution.In native memory should we store vectors as 1-D array or create a 2-D array to store the vectors?
If we look at create index interfaces of Faiss and Nmslib we can see that both of them takes a vectors in 1D array. Hence it make sense to use a 1-D array otherwise we will need to construct the 1-D array from 2D array which will consume computations for a larger datasets.Test Plan
Below are the list of tests/Benchmarks that will performed apart from Unit test and Integration tests.Correctness Testing
The below tests will ensure that we are able to merge the large dataset even when heap size is less that what Opensearch can accommodate. The below configurations errors out on Opensearch version 2.13. The reason for choosing these configurations because we have seen errors in these configurations recently.Data set | Dimension | Data set size | Data Nodes Count | Data Node Type | Heap Size in GB | Number of Shards | Replicas | Max Number of Segments | Vectors Size | Engine | Data Type | Algorithm -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- Cohere | 768 | 1000000 | 1 | r6g.2xlarge | 2 | 1 | 0 | 1 | 2.86102 | Nmslib | float32 | HNSW Open AI | 1536 | 5000000 | 1 | r6g.4xlarge | 32 | 1 | 0 | 1 | 28.61023 | Nmslib | float32 | HNSW SIFT | 128 | 1000000000 | 8 | r6g.12xlarge | 32 | 64 | 1 | 1 | 476.83716 | Nmslib | float32 | HNSW Cohere | 768 | 1000000 | 1 | r6g.2xlarge | 2 | 1 | 0 | 1 | 2.86102 | Faiss | float32 | HNSW Open AI | 1536 | 5000000 | 1 | r6g.4xlarge | 32 | 1 | 0 | 1 | 28.61023 | Faiss | float32 | HNSW SIFT | 128 | 1000000000 | 8 | r6g.12xlarge | 32 | 64 | 1 | 1 | 476.83716 | Faiss | float16 | HNSW
A/B Testing with Opensearch version 2.13
We are going to use nightly benchmarks to validate if there are any regression happening in the system with this change. The main parameters we will be looking for is the indexing time, force merge time and refresh time. Along with that recall should remain intact.Currently nightly benchmarks doesn’t run training related workloads for those workloads we are going to run them separately. Below are the details.
Data set | Dimension | Data set size | Data Nodes Count | Data Node Type | Heap Size in GB | Number of Shards | Max Number of Segments | Vectors Size | Engine | Data Type | Algorithm -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- SIFT | 128 | 1000000 | 3 | r5.4xlarge | 32 | 24 | 1 | 0.47684 | Faiss | float32 | HNSW-PQ SIFT | 128 | 1000000 | 3 | r5.4xlarge | 32 | 24 | 1 | 0.47684 | Faiss | float32 | IVF SIFT | 128 | 1000000 | 3 | r5.4xlarge | 32 | 24 | 1 | 0.47684 | Faiss | float32 | IVF-PQ The experiment results will be compared with results here: https://github.com/opensearch-project/k-NN/issues/1473
## Tasks - [x] Complete the RFC for the feature - [X] Add benchmark code to k-NN plugin for all the benchmarks used in the RFC. - [ ] Raise PR for the feature including UTs and ITs - [ ] Benchmark with nightly runs to validate no regression