Closed 0ctopus13prime closed 1 week ago
@jmazanec15
Hi Jack, please refer to section 5.5. [P1] Nmslib - Introducing Similar Concept of IOReader in FAISS.
for future changes that I will make for NMSLIB.
NMSLIB patch was merged - https://github.com/opensearch-project/k-NN/pull/2144 Will raise a new PR for stream support in NMSLIB.
Current PR : https://github.com/opensearch-project/k-NN/pull/2139
Hi Navneet, ran a benchmark (including both bulk ingestion + searching), I could not find any evidences that showing memory peak during searching. Please let me know if it looks good on you and we can merge this! Thank you.
After #2139 PR got merged, there are two more PRs to be followed shortly.
After above two PRs, introducing a loading layer is officially available in OpenSearch. But for the writing layer, we need to tune the performance, as we can see from the benchmark from POC, it is likely to have 20% performance degradation and we need to fix it. Once we fixed the perf issue, we can raise one single PR to introduce writing layer in both native engines.
PR for introducing a loading layer in NMSLIB https://github.com/opensearch-project/k-NN/pull/2185
From memory stand-point, not major changes I could observe from benchmark.
The numbers below were measured through time curl -X GET http://localhost:9200/_plugins/_knn/warmup/target_index
.
I made two different experiments of loading a FAISS vector index with different buffer sizes.
sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches'
)Unlike FAISS, it took almost 81% more time when loading a system cached file. Of course, this case will be rare, as it is expected that KNN will load a vector index whenever a new segment file is baked. And the newly baked segment file likely is not system cached. Increasing buffer size didn't help. Need to find a better way to transfer data from JNI to Java.
Index size : 30G
Machine : -XMS63G -XMX63G
JVM Args : c5ad.12xlarge
Data : random-s-128-10m-euclidean.hdf5
Metric | Task | Baseline-Value | Candidate-Value | Change | Unit |
---|---|---|---|---|---|
Cumulative indexing time of primary shards | 33.7147 | 34.2115 | 1.47% | min | |
Min cumulative indexing time across primary shards | 0.000133333 | 0.00015 | 12.50% | min | |
Median cumulative indexing time across primary shards | 16.8573 | 17.1057 | 1.47% | min | |
Max cumulative indexing time across primary shards | 33.7146 | 34.2113 | 1.47% | min | |
Cumulative indexing throttle time of primary shards | 0 | 0 | 0.00% | min | |
Min cumulative indexing throttle time across primary shards | 0 | 0 | 0.00% | min | |
Median cumulative indexing throttle time across primary shards | 0 | 0 | 0.00% | min | |
Max cumulative indexing throttle time across primary shards | 0 | 0 | 0.00% | min | |
Cumulative merge time of primary shards | 282.601 | 282.996 | 0.14% | min | |
Cumulative merge count of primary shards | 125 | 122 | 2.40% | ||
Min cumulative merge time across primary shards | 0 | 0 | 0.00% | min | |
Median cumulative merge time across primary shards | 141.3 | 141.498 | 0.14% | min | |
Max cumulative merge time across primary shards | 282.601 | 282.996 | 0.14% | min | |
Cumulative merge throttle time of primary shards | 1.04818 | 1.61307 | 53.89% | min | |
Min cumulative merge throttle time across primary shards | 0 | 0 | 0.00% | min | |
Median cumulative merge throttle time across primary shards | 0.524092 | 0.806533 | 53.89% | min | |
Max cumulative merge throttle time across primary shards | 1.04818 | 1.61307 | 53.89% | min | |
Cumulative refresh time of primary shards | 1.1042 | 1.14667 | 3.85% | min | |
Cumulative refresh count of primary shards | 88 | 85 | 3.41% | ||
Min cumulative refresh time across primary shards | 0.000333333 | 0.000383333 | 15.00% | min | |
Median cumulative refresh time across primary shards | 0.5521 | 0.573333 | 3.85% | min | |
Max cumulative refresh time across primary shards | 1.10387 | 1.14628 | 3.84% | min | |
Cumulative flush time of primary shards | 11.5126 | 10.9446 | 4.93% | min | |
Cumulative flush count of primary shards | 56 | 53 | 5.36% | ||
Min cumulative flush time across primary shards | 0 | 0 | 0.00% | min | |
Median cumulative flush time across primary shards | 5.75628 | 5.47229 | 4.93% | min | |
Max cumulative flush time across primary shards | 11.5126 | 10.9446 | 4.93% | min | |
Total Young Gen GC time | 0.338 | 0.342 | 1.18% | s | |
Total Young Gen GC count | 19 | 19 | 0.00% | ||
Total Old Gen GC time | 0 | 0 | 0.00% | s | |
Total Old Gen GC count | 0 | 0 | 0.00% | ||
Store size | 29.8586 | 29.8584 | 0.00% | GB | |
Translog size | 5.83E-07 | 5.83E-07 | 0.00% | GB | |
Heap used for segments | 0 | 0 | 0.00% | MB | |
Heap used for doc values | 0 | 0 | 0.00% | MB | |
Heap used for terms | 0 | 0 | 0.00% | MB | |
Heap used for norms | 0 | 0 | 0.00% | MB | |
Heap used for points | 0 | 0 | 0.00% | MB | |
Heap used for stored fields | 0 | 0 | 0.00% | MB | |
Segment count | 2 | 2 | 0.00% | ||
Min Throughput | custom-vector-bulk | 5390.31 | 5466.23 | 1.41% | docs/s |
Mean Throughput | custom-vector-bulk | 11041.1 | 10866 | 1.59% | docs/s |
Median Throughput | custom-vector-bulk | 10377.9 | 10065.6 | 3.01% | docs/s |
Max Throughput | custom-vector-bulk | 20105.1 | 19337.8 | 3.82% | docs/s |
50th percentile latency | custom-vector-bulk | 78.4349 | 75.8132 | 3.34% | ms |
90th percentile latency | custom-vector-bulk | 165.667 | 158.129 | 4.55% | ms |
99th percentile latency | custom-vector-bulk | 331.043 | 318.269 | 3.86% | ms |
99.9th percentile latency | custom-vector-bulk | 1486.47 | 1487.08 | 0.04% | ms |
99.99th percentile latency | custom-vector-bulk | 2300.48 | 2598.04 | 12.93% | ms |
100th percentile latency | custom-vector-bulk | 5049.72 | 4535.08 | 10.19% | ms |
50th percentile service time | custom-vector-bulk | 78.4349 | 75.8132 | 3.34% | ms |
90th percentile service time | custom-vector-bulk | 165.667 | 158.129 | 4.55% | ms |
99th percentile service time | custom-vector-bulk | 331.043 | 318.269 | 3.86% | ms |
99.9th percentile service time | custom-vector-bulk | 1486.47 | 1487.08 | 0.04% | ms |
99.99th percentile service time | custom-vector-bulk | 2300.48 | 2598.04 | 12.93% | ms |
100th percentile service time | custom-vector-bulk | 5049.72 | 4535.08 | 10.19% | ms |
error rate | custom-vector-bulk | 0 | 0 | 0.00% | % |
Min Throughput | force-merge-segments | 0 | 0 | 0.00% | ops/s |
Mean Throughput | force-merge-segments | 0 | 0 | 0.00% | ops/s |
Median Throughput | force-merge-segments | 0 | 0 | 0.00% | ops/s |
Max Throughput | force-merge-segments | 0 | 0 | 0.00% | ops/s |
100th percentile latency | force-merge-segments | 1.16E+07 | 1.13E+07 | 2.84% | ms |
100th percentile service time | force-merge-segments | 1.16E+07 | 1.13E+07 | 2.84% | ms |
error rate | force-merge-segments | 0 | 0 | 0.00% | % |
Min Throughput | warmup-indices | 0.24 | 0.14 | 41.67% | ops/s |
Mean Throughput | warmup-indices | 0.24 | 0.14 | 41.67% | ops/s |
Median Throughput | warmup-indices | 0.24 | 0.14 | 41.67% | ops/s |
Max Throughput | warmup-indices | 0.24 | 0.14 | 41.67% | ops/s |
100th percentile latency | warmup-indices | 4162.87 | 7127.78 | 71.22% | ms |
100th percentile service time | warmup-indices | 4162.87 | 7127.78 | 71.22% | ms |
error rate | warmup-indices | 0 | 0 | 0.00% | % |
Min Throughput | prod-queries | 0.66 | 0.64 | 3.03% | ops/s |
Mean Throughput | prod-queries | 0.66 | 0.64 | 3.03% | ops/s |
Median Throughput | prod-queries | 0.66 | 0.64 | 3.03% | ops/s |
Max Throughput | prod-queries | 0.66 | 0.64 | 3.03% | ops/s |
50th percentile latency | prod-queries | 3.5832 | 3.83349 | 6.99% | ms |
90th percentile latency | prod-queries | 4.75317 | 4.64172 | 2.34% | ms |
99th percentile latency | prod-queries | 22.1628 | 23.8439 | 7.59% | ms |
100th percentile latency | prod-queries | 1508.36 | 1571.86 | 4.21% | ms |
50th percentile service time | prod-queries | 3.5832 | 3.83349 | 6.99% | ms |
90th percentile service time | prod-queries | 4.75317 | 4.64172 | 2.34% | ms |
99th percentile service time | prod-queries | 22.1628 | 23.8439 | 7.59% | ms |
100th percentile service time | prod-queries | 1508.36 | 1571.86 | 4.21% | ms |
error rate | prod-queries | 0 | 0 | 0.00% | % |
Mean recall@k | prod-queries | 0.42 | 0.43 | 2.38% | |
Mean recall@1 | prod-queries | 0.6 | 0.63 | 5.00% |
After switching from direct file API usage to an abstract IO loading layer, additional overhead was introduced due to JNI calls and buffer copying via std::memcpy
. This change resulted in a 30% increase in loading time compared to the baseline in FAISS. The baseline took 3.584 seconds to load a 6GB vector index, while the modified version increased the load time to 4.664 seconds.
In NMSLIB, we expected a similar level of performance regression as seen in FAISS. However, we're observing a 70% increase in load time when loading a 6GB vector index. (baseline=4.144 sec, the modified one=7.503 sec) Why is the performance impact in NMSLIB more than twice as severe as in FAISS?
The key performance difference in index loading between FAISS and NMSLIB stems from their file formats. In NMSLIB, this difference results in JNI calls being made O(N) times, where N is the number of vectors, whereas in FAISS, the number of JNI calls is O(1).
FAISS stores chunks of the neighbor list in a single location and loads them all at once. See the code below:
static void read_HNSW(HNSW* hnsw, IOReader* f) {
READVECTOR(hnsw->assign_probas);
READVECTOR(hnsw->cum_nneighbor_per_level);
READVECTOR(hnsw->levels);
READVECTOR(hnsw->offsets);
READVECTOR(hnsw->neighbors);
READ1(hnsw->entry_point);
READ1(hnsw->max_level);
READ1(hnsw->efConstruction);
READ1(hnsw->efSearch);
READ1(hnsw->upper_beam);
}
In NMSLIB, each neighbor list is stored individually, requiring O(N) reads, where N is the total number of vectors.
As shown in the code below, we need totalElementsStored_
read operations.
Note that input.read()
ultimately calls JNI to delegate Lucene’s IndexInput to read bytes thanks to the introduced loading layer. As a result, the number of input.read()
calls directly corresponds to the number of JNI calls.
for (size_t i = 0; i < totalElementsStored_; i++) {
...
} else {
linkLists_[i] = (char *)malloc(linkListSize);
CHECK(linkLists_[i]);
input.read(linkLists_[i], linkListSize); <--------- THIS!
}
data_rearranged_[i] = new Object(data_level0_memory_ + (i)*memoryPerObject_ + offsetData_);
}
We can patch NMSLIB to avoid making JNI calls for each vector element. The idea is to load data in bulk, then parse the neighbor lists from that buffer, rather than reading bytes individually. This approach would reduce the number of JNI calls to O(Index size / Buffer size).
For example, with a 6GB vector index containing 1 million vectors and a 64KB buffer size, the required JNI calls would be reduced to O(6GB / 64KB) = 98,304, which is a significant improvement over 1 million calls, achieving nearly a 90% reduction in operations.
Result: Surprisingly, it is 8% faster than the baseline. (Note: I reindexed on a new single node, which is why the loading time differs from the one mentioned earlier in the issue.)
template <typename dist_t>
void Hnsw<dist_t>::LoadOptimizedIndex(NmslibIOReader& input) {
...
const size_t bufferSize = 64 * 1024; // 64KB
std::unique_ptr<char[]> buffer (new char[bufferSize]);
uint32_t end = 0;
uint32_t pos = 0;
const bool isLTE = _isLittleEndian();
for (size_t i = 0, remainingBytes = input.remaining(); i < totalElementsStored_; i++) {
// Read linkList size integer.
if ((pos + sizeof(SIZEMASS_TYPE)) >= end) {
// Underflow, load bytes in bulk.
const auto firstPartLen = end - pos;
if (firstPartLen > 0) {
std::memcpy(buffer.get(), buffer.get() + pos, firstPartLen);
}
const auto copyBytes = std::min(remainingBytes, bufferSize - firstPartLen);
input.read(buffer.get() + firstPartLen, copyBytes);
remainingBytes -= copyBytes;
end = copyBytes + firstPartLen;
pos = 0;
}
// Read data size. SIZEMASS_TYPE -> uint32_t
SIZEMASS_TYPE linkListSize = 0;
if (isLTE) {
linkListSize = _readIntLittleEndian(buffer[pos], buffer[pos + 1], buffer[pos + 2], buffer[pos + 3]);
} else {
linkListSize = _readIntBigEndian(buffer[pos], buffer[pos + 1], buffer[pos + 2], buffer[pos + 3]);
}
pos += 4;
if (linkListSize == 0) {
linkLists_[i] = nullptr;
} else {
// Now we load neighbor list.
linkLists_[i] = (char *) malloc(linkListSize);
CHECK(linkLists_[i]);
SIZEMASS_TYPE leftLinkListData = linkListSize;
auto dataPtr = linkLists_[i];
while (leftLinkListData > 0) {
if (pos >= end) {
// Underflow, load bytes in bulk.
const auto copyBytes = std::min(remainingBytes, bufferSize);
input.read(buffer.get(), copyBytes);
remainingBytes -= copyBytes;
end = copyBytes;
pos = 0;
}
const auto copyBytes = std::min(leftLinkListData, end - pos);
std::memcpy(dataPtr, buffer.get() + pos, copyBytes);
dataPtr += copyBytes;
leftLinkListData -= copyBytes;
pos += copyBytes;
} // End while
} // End if
data_rearranged_[i] = new Object(data_level0_memory_ + (i)*memoryPerObject_ + offsetData_);
} // End for
...
Since we're deprecating NMSLIB in version 3.x, we can disable loading layer in NMSLIB until then. Or, we can selectively allow streaming in NMSLIB depending on whether the given Directory is FSDirectory implementation.
if (directory instance of Directory) {
loadIndexByFilePath(...);
} else {
loadIndexByStreaming(...);
}
Since we're deprecating NMSLIB in version 3.x, we can tolerate this issue in the short term. However, I personally don't favor this approach, as it impacts the p99 latency metrics, which are rare but could still affect overall cluster performance at the worst case.
We decided to go with the solution 1 for NMSLIB PR - https://github.com/opensearch-project/k-NN/pull/2185
As the issue is linked with the PR and PR get closed the issue is also getting closed, which we don't want. Thanks @dblock for removing the untriaged label.
@0ctopus13prime just checking, what items are left for this feature to complete?
@navneet1v Writing part for native engine is the left part! Stuck in deprecating FileWatcher, but we can first have the writing layer to be added.
This analysis report explores the potential impact of replacing the std::ofstream
I/O mechanism with a writing layer in native engines (FAISS, NMSLIB). In this new approach, all I/O writing relies on an interface that internally uses Lucene’s IndexOutput
to manage byte flushing. Depending on the provided directory, it can flush bytes to the host's file system or to S3. The introduction of additional logic for handling bytes—such as multiple JNI calls and virtual calls—may introduce some latency.
However, the extent of this latency increase is currently unknown. This document details the methodology I used to reach my conclusion, with the aim of aligning on the understanding that the writing layer will not cause noticeable performance degradation.
After multiple rounds of benchmark and impact deep dive analysis, I concluded that the writing layer’s contribution to latency overhead is minimal, account for at most 1% of the shard indexing time. While some sections of the benchmark results suggested potential performance degradation due to the change, after serial benchmarks indicated that those inconsistent numbers are likely noise. Therefore, my conclusion is that hardly expect severe performance degradation coming from writing layer.
What proportion does the writing layer contribute to the overall shard-level indexing process? The vector indexing process within each shard consists of three stages:
std::ofstream
is being used.
Writing layer is at this stage.I created a standalone program to load prepared vector data and feed it to the JNIService, which manages the vector index as a whole. In my standalone testing, I used 1 million vectors, each with 128 dimensions (random-s-128-10m-euclidean.hdf5). In this program, I tested the following three scenarios:
std::fstream
to flush the vector index, which is the baseline.IOWriter
implementation that uses std::fstream
internally to write bytes directly to a file without relying on IndexOutput
. This implementation can only be used when an FSDirectory
is provided.The total indexing time was 5 minutes and 50 seconds, with only 1 second (this is the worst case I can imagine, the actual numbers I got were less than 1 seconds) spent on flushing the vector index (the final step), which accounts for 0.2% of the total time. This indicates that the majority of the time in vector indexing is spent constructing the in-memory vector index, while only 0.2% is allocated to I/O processing. Even though flushing takes more than twice as long after the introduction of the writing layer, its overall impact is marginal, resulting in a total time of 5 minutes and 51 seconds.
Test case | Time (micro) | Time Diff | |
---|---|---|---|
Baseline | 450436 | ||
Streaming | 656856 | 0.45827 | |
Hybrid | 457356 | 0.01513 | |
The resulting file size is 634 MB. The baseline took approximately 0.45 seconds to flush, while the streaming approach took 0.65 seconds. The hybrid approach was nearly identical to the baseline, taking 0.457 seconds. As observed, there was indeed an increase in flushing time from 0.45 seconds to 0.65 seconds. However, since both vector transfer and constructing the vector index account for 99.8% of the total time, I anticipate that the writing layer will have minimal impact on overall performance.
I excluded sections from benchmark results according to below conditions.
From the results, I can see that cumulative indexing time and merging time are almost identical between the baseline and writing layer. Only p100 latency is likely impacted from the writing layer.
Note that the values in the writing layer represent the average of five benchmark results.
Metric | Task | Baseline | Writing Layer | Diff | Unit |
---|---|---|---|---|---|
Min Throughput | custom-vector-bulk | 5793.51 | 5225.818 | -0.10863 | docs/s |
Max Throughput | custom-vector-bulk | 19347.1 | 17716.92 | -0.09201 | docs/s |
100th percentile latency | custom-vector-bulk | 3157.52 | 4727.68 | 0.33212 | ms |
Surprisingly, the overall performance has improved compared to the baseline. I believe this enhancement is due to the use of 64 KB I/O buffering for writing, as opposed to the 4 KB buffer size used by std::ofstream
.
I applied the same filtering rule mentioned in Faiss to exclude specific sections. Note that the values in the writing layer represent the average of three benchmark results.
Metric | Task | Baseline | Candidate-1 | Diff | Unit |
---|---|---|---|---|---|
Cumulative merge throttle time of primary shards | 1.73867 | 1.33853 | -0.23014 | min | |
Median cumulative merge throttle time across primary shards | 0.86933 | 0.66926 | -0.23014 | min | |
Max cumulative merge throttle time across primary shards | 1.73867 | 1.33853 | -0.23014 | min | |
Min Throughput | custom-vector-bulk | 3279.76 | 4316.82 | 0.3162 | docs/s |
99.99th percentile latency | custom-vector-bulk | 3193.2 | 2751.545 | -0.3245 | ms |
100th percentile latency | custom-vector-bulk | 6304.56 | 4634.195 | -0.35049 | ms |
Based on the results, I believe we can safely go with the writing layer. However, if we want to be more conservative, we can adopt the hybrid approach (see Case 3. Use hybrid approach). In this approach, we implement the IOReader
interface but use std::ofstream
to flush bytes when an FSDirectory
is provided.
From the micro-performance testing, we can see that the time spent is nearly identical to the baseline (baseline = 0.45 sec, hybrid = 0.457 sec). One of the greatest advantages of this approach is that it allows us to have a single codebase for I/O operations using the writing layer, while also being simple and easy to implement. Additionally, it guarantees users an identical indexing experience.
Overall, I don't see any downsides to this approach.
public static void main(String... args) throws IOException {
final String dataPath = "/home/ec2-user/dump_to_print/data.json";
final String tmpDirectory = "tmp-" + UUID.randomUUID();
final Directory directory = new MMapDirectory(Path.of(tmpDirectory));
final int numData = 10000;
final int dim = 128;
Map<String, Object> parameters = new HashMap<>();
parameters.put("name", "hnsw");
parameters.put("data_type", "float");
parameters.put("index_description", "HNSW16,Flat");
parameters.put("spaceType", "l2");
Map<String, Object> innerParameters = new HashMap<>();
innerParameters.put("ef_search", 10);
innerParameters.put("ef_construction", 100);
Map<String, Object> encoderParameters = new HashMap<>();
encoderParameters.put("name", "flat");
encoderParameters.put("parameters", Collections.emptyMap());
innerParameters.put("encoder", encoderParameters);
parameters.put("parameters", innerParameters);
parameters.put("indexThreadQty", 1);
final String fullPath = tmpDirectory + "/output";
try (final IndexOutput indexOuptut = directory.createOutput("output", IOContext.DEFAULT)) {
System.out.println("Output : " + tmpDirectory + "/output");
IndexOutputWithBuffer indexOutputWithBuffer = new IndexOutputWithBuffer(indexOuptut);
FaissService.kdyBench(numData, dim, dataPath, parameters, indexOutputWithBuffer, fullPath);
// For NMSLIB testing, uncomment below.
// NmslibService.kdyBench(numData, dim, dataPath, parameters, indexOutputWithBuffer);
}
System.out.println("OUT!!!!!!!!");
}
import h5py
import json
# file_name = 'sift-128-euclidean.hdf5'
file_name = 'random-s-128-10m-euclidean.hdf5'
f = h5py.File(file_name, 'r')
data = f['train']
L = min(1000000, len(data))
for i in range(L):
print(json.dumps(data[i].tolist()))
f.close()
//////////////////////////////////////
// Result
//////////////////////////////////////
[5.838372982574612, 2.711372391128072, 5.014160838955179, -8.336033892368915, ...]
std::cout << "Stream!!!!!!!!" << std::endl;
auto start = std::chrono::high_resolution_clock::now();
knn_jni::faiss_wrapper::WriteIndex(&jniUtil, env, indexOutput, indexAddress, &indexService);
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::micro> duration = end - start;
std::cout << "Stream file version -> Execution time: " << duration.count() << " microseconds" << std::endl;
std::ofstream
std::cout << "FStream!!!!!!" << std::endl;
const std::string destPath = jniUtil.ConvertJavaStringToCppString(env, fullPathj);
auto start = std::chrono::high_resolution_clock::now();
knn_jni::faiss_wrapper::WriteIndexLegacy(&jniUtil, env, destPath, indexAddress, &indexService);
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::micro> duration = end - start;
std::cout << "Legacy file version -> Execution time: " << duration.count() << " microseconds" << std::endl;
const std::string destPath = jniUtil.ConvertJavaStringToCppString(env, fullPathj);
auto start = std::chrono::high_resolution_clock::now();
knn_jni::faiss_wrapper::WriteIndexKdy(&jniUtil, env, destPath, indexAddress, &indexService);
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::micro> duration = end - start;
std::cout << "Kdy Hybrid Stream file version -> Execution time: "
<< duration.count() << " microseconds" << std::endl;
void knn_jni::faiss_wrapper::WriteIndexKdy(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env,
const std::string& path, jlong index_ptr, IndexService* indexService) {
knn_jni::stream::KdyFaissIOWriter writer {path};
// Create index
indexService->writeIndex(&writer, index_ptr);
}
class KdyFaissIOWriter final : public faiss::IOWriter {
public:
explicit KdyFaissIOWriter(const std::string& _path)
: faiss::IOWriter(),
path(_path),
stream(_path, std::ios::binary) {
name = "FaissOpenSearchIOWriter";
}
size_t operator()(const void *ptr, size_t size, size_t nitems) final {
const auto writeBytes = size * nitems;
if (writeBytes > 0) {
stream.write((char*) ptr, writeBytes);
}
return nitems;
}
// return a file number that can be memory-mapped
int filedescriptor() final {
throw std::runtime_error("filedescriptor() is not supported in KdyFaissIOWriter.");
}
private:
std::string path;
std::ofstream stream;
}; // class FaissOpenSearchIOWriter
std::unique_ptr<knn_jni::faiss_wrapper::FaissMethods> faissMethods(
new knn_jni::faiss_wrapper::FaissMethods());
knn_jni::faiss_wrapper::IndexService indexService(std::move(faissMethods));
const std::string dataPath = jniUtil.ConvertJavaStringToCppString(env, inputDataPathJ);
std::ifstream in (dataPath);
std::string line;
std::vector<float> vectors;
int64_t id = 0;
std::vector<int64_t> ids;
while (std::getline(in, line)) {
int s = 1;
for (int i = s ; i < line.size() ; ) {
while (line[i] != ',' && line[i] != ']') {
++i;
}
while (s < line.size() && line[s] == ' ') {
++s;
}
std::string value = line.substr(s, (i - s));
const float fvalue = std::stod(value);
vectors.push_back(fvalue);
s = ++i;
}
ids.push_back(id++);
}
numData = ids.size();
std::cout << "dim=" << dim
<< ", numData=" << numData
<< ", |vectors|=" << vectors.size() << std::endl;
indexService.insertToIndex(dim, numData, 1,
(int64_t) (&vectors),
ids,
indexAddress);
std::cout << "Insert to index is done!" << std::endl;
@navneet1v I think we can safely go with the writing layer, from the analysis I think having it only affects 1% of the total shard vector index processing. Had multiple rounds of benchmark + micro benchmark, but overall, those numbers in benchmark are likely noise. Meaning that represented degradation might be noise, and I don't believe those dropped numbers are because of the writing layer.
Please feel free to share your thoughts on it. Once we get aligned on it is safe to go, will work on carving down the code and will continue add IT tests + unit tests.
Thank you!
@0ctopus13prime thanks for sharing the results. I am aligned with going with the writing layer and shouldn't build a hybrid approach. The minimal degradation we are seeing is just noise which is very prominent with indexing in general. Thanks for sharing the detailed analysis. From my side I would say lets write the code.
On a separate PR please include these micro-benchmarks too in the repo so that it can be used later.
From the results, it's expected that total cumulative indexing time will be increased up to 2% (67.67min -> 68.98min), thus it means that the bulk indexing throughput can be decreased down to 1.4% (7861 -> 7744).
<html xmlns:v="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office" xmlns:x="urn:schemas-microsoft-com:office:excel" xmlns="http://www.w3.org/TR/REC-html40">
Metric | Task | Baseline | Candidate | Diff | Unit -- | -- | -- | -- | -- | -- Cumulative indexing time of primary shards | | 67.6766 | 68.9868 | 0.01935972 | min Min cumulative indexing time across primary shards | | 0 | 0 | 0 | min Median cumulative indexing time across primary shards | | 10.92455 | 11.3572 | 0.039603462 | min Max cumulative indexing time across primary shards | | 22.9782 | 23.4515 | 0.020597784 | min Cumulative indexing throttle time of primary shards | | 0 | 0 | 0 | min Min cumulative indexing throttle time across primary shards | | 0 | 0 | 0 | min Median cumulative indexing throttle time across primary shards | | 0 | 0 | 0 | min Max cumulative indexing throttle time across primary shards | | 0 | 0 | 0 | min Cumulative merge time of primary shards | | 455.27 | 463.033 | 0.01705142 | min Cumulative merge count of primary shards | | 628 | 631 | 0.00477707 | Min cumulative merge time across primary shards | | 0 | 0 | 0 | min Median cumulative merge time across primary shards | | 74.92695 | 75.6642 | 0.009839584 | min Max cumulative merge time across primary shards | | 154.315 | 158.499 | 0.027113372 | min Cumulative merge throttle time of primary shards | | 25.78985 | 22.7165 | -0.119168975 | min Min cumulative merge throttle time across primary shards | | 0 | 0 | 0 | min Median cumulative merge throttle time across primary shards | | 3.979025 | 3.18752 | -0.198919333 | min Max cumulative merge throttle time across primary shards | | 9.53849 | 8.29422 | -0.130447272 | min Cumulative refresh time of primary shards | | 0.8357415 | 0.8457 | 0.011915766 | min Cumulative refresh count of primary shards | | 475.5 | 473 | -0.005257624 | Min cumulative refresh time across primary shards | | 0 | 0 | 0 | min Median cumulative refresh time across primary shards | | 0.134925 | 0.132867 | -0.015252918 | min Max cumulative refresh time across primary shards | | 0.2874665 | 0.302833 | 0.053454924 | min Cumulative flush time of primary shards | | 15.55735 | 15.5443 | -0.000838832 | min Cumulative flush count of primary shards | | 350 | 349 | -0.002857143 | Min cumulative flush time across primary shards | | 0 | 0 | 0 | min Median cumulative flush time across primary shards | | 2.5186 | 2.5161 | -0.000992615 | min Max cumulative flush time across primary shards | | 5.39728 | 5.36828 | -0.005373077 | min Total Young Gen GC time | | 1.469 | 1.706 | 0.161334241 | s Total Young Gen GC count | | 160 | 174 | 0.0875 | Total Old Gen GC time | | 0 | 0 | 0 | s Total Old Gen GC count | | 0 | 0 | 0 | Store size | | 340.746 | 340.744 | -5.86947E-06 | GB Translog size | | 1.43983E-06 | 1.20E-06 | -0.168176688 | GB Heap used for segments | | 0 | 0 | 0 | MB Heap used for doc values | | 0 | 0 | 0 | MB Heap used for terms | | 0 | 0 | 0 | MB Heap used for norms | | 0 | 0 | 0 | MB Heap used for points | | 0 | 0 | 0 | MB Heap used for stored fields | | 0 | 0 | 0 | MB Segment count | | 5.5 | 5 | -0.090909091 | Min Throughput | custom-vector-bulk | 6584.245 | 5835.71 | -0.113685776 | docs/s Mean Throughput | custom-vector-bulk | 7861.24 | 7744.98 | -0.014789015 | docs/s Median Throughput | custom-vector-bulk | 7596.51 | 7518.42 | -0.010279721 | docs/s Max Throughput | custom-vector-bulk | 10233.015 | 9612.79 | -0.060610192 | docs/s 50th percentile latency | custom-vector-bulk | 72.44355 | 73.3195 | 0.012091484 | ms 90th percentile latency | custom-vector-bulk | 151.5965 | 155.167 | 0.023552655 | ms 99th percentile latency | custom-vector-bulk | 260.645 | 257.376 | -0.012541963 | ms 99.9th percentile latency | custom-vector-bulk | 378.015 | 381.875 | 0.010211235 | ms 99.99th percentile latency | custom-vector-bulk | 608.632 | 932.898 | 0.532778428 | ms 100th percentile latency | custom-vector-bulk | 1080.8535 | 3556.3 | 2.290270143 | ms 50th percentile service time | custom-vector-bulk | 72.44355 | 73.3195 | 0.012091484 | ms 90th percentile service time | custom-vector-bulk | 151.5965 | 155.167 | 0.023552655 | ms 99th percentile service time | custom-vector-bulk | 260.645 | 257.376 | -0.012541963 | ms 99.9th percentile service time | custom-vector-bulk | 378.015 | 381.875 | 0.010211235 | ms 99.99th percentile service time | custom-vector-bulk | 608.632 | 932.898 | 0.532778428 | ms 100th percentile service time | custom-vector-bulk | 1080.8535 | 3556.3 | 2.290270143 | ms error rate | custom-vector-bulk | 0 | 0 | 0 | % Min Throughput | force-merge-segments | 0 | 0 | 0 | ops/s Mean Throughput | force-merge-segments | 0 | 0 | 0 | ops/s Median Throughput | force-merge-segments | 0 | 0 | 0 | ops/s Max Throughput | force-merge-segments | 0 | 0 | 0 | ops/s 100th percentile latency | force-merge-segments | 11015600 | 1.13E+07 | 0.026789281 | ms 100th percentile service time | force-merge-segments | 11015600 | 1.13E+07 | 0.026789281 | ms error rate | force-merge-segments | 0 | 0 | 0 | % Min Throughput | warmup-indices | 0.02 | 0.02 | 0 | ops/s Mean Throughput | warmup-indices | 0.02 | 0.02 | 0 | ops/s Median Throughput | warmup-indices | 0.02 | 0.02 | 0 | ops/s Max Throughput | warmup-indices | 0.02 | 0.02 | 0 | ops/s 100th percentile latency | warmup-indices | 46760.8 | 54720 | 0.170210946 | ms 100th percentile service time | warmup-indices | 46760.8 | 54720 | 0.170210946 | ms error rate | warmup-indices | 0 | 0 | 0 | % Min Throughput | prod-queries | 2.08 | 1.61 | -0.225961538 | ops/s Mean Throughput | prod-queries | 8.765 | 10.09 | 0.151169424 | ops/s Median Throughput | prod-queries | 8.765 | 4.43 | -0.494580719 | ops/s Max Throughput | prod-queries | 15.445 | 24.24 | 0.569439948 | ops/s 50th percentile latency | prod-queries | 9.361765 | 10.2061 | 0.090189724 | ms 90th percentile latency | prod-queries | 11.2586 | 12.3034 | 0.092800171 | ms 99th percentile latency | prod-queries | 482.748 | 557.695 | 0.155250773 | ms 100th percentile latency | prod-queries | 503.6475 | 621.016 | 0.233036995 | ms 50th percentile service time | prod-queries | 9.361765 | 10.2061 | 0.090189724 | ms 90th percentile service time | prod-queries | 11.2586 | 12.3034 | 0.092800171 | ms 99th percentile service time | prod-queries | 482.748 | 557.695 | 0.155250773 | ms 100th percentile service time | prod-queries | 503.6475 | 621.016 | 0.233036995 | ms error rate | prod-queries | 0 | 0 | 0 | % Mean recall@k | prod-queries | 0.34 | 0.35 | 0.029411765 | Mean recall@1 | prod-queries | 0.495 | 0.4 | -0.191919192 |
Introducing Loading Layer in Native KNN Engines
1. Goal
FAISS and Nmslib, two native engines, have been integral to delivering advanced vector search capabilities in OpenSearch. Alongside the official Lucene vector format, these engines have played a significant role in meeting the growing vector search needs of customers, especially in scenarios where Lucene alone might not suffice. However, the tight coupling in the way vector indexes are loaded during searches has made it challenging for OpenSearch to scale as a vector search solution across various Directory implementations. As of this writing, OpenSearch only supports FSDirectory, limiting its compatibility with other network-based Directory implementations, such as those backed by S3 or NFS.
This document provides an overview of a solution designed to cut this dependency, making OpenSearch compatible with multiple Directory implementations. In the following sections, it will guide the audience through the importance of introducing an abstract loading layer within each native engine. This layer will enable transparent loading of vector indexes, regardless of the specific Directory implementation used.
Related Issues
1693 [ENHANCEMENT] Support disk based access of Vectors for Faiss HNSW algorithm
10024 [Proposal] Tiered caching - OpenSearch : https://github.com/opensearch-project/OpenSearch/issues/10024
2. Scope
In this document, we focus exclusively on two types of native engines: FAISS and Nmslib. Lucene vector search is not covered here, as it is already integrated with Directory implementations.
Among the native engines, we will delve deeper into FAISS, while providing only high-level conceptual sketches for Nmslib. The primary reason for this is that, unlike FAISS, Nmslib lacks a loading interface (e.g., FAISS’s IOReader). However, the approach in Nmslib will closely mirror the work in FAISS, where we first introduce a loading interface, then build a mediator that indirectly calls IndexInput to copy bytes upon it.
As we are still in the proposal phase, detailed performance impacts will be addressed in the next phase, after benchmarks have been conducted and real data analyzed. In this initial phase, our focus is solely on creating a scalable interface that allows OpenSearch to integrate with multiple Directory implementations, while keeping the native engines unchanged. We will not be modifying any of the assumptions made by the native engines at this stage. Although further optimizations could be achieved by adjusting these assumptions, that will be the subject of future discussions and is beyond the scope of this document.
For example, it is out of scope for now and we leave it as the next opportunity room for improvement, FAISS loads all data into physical memory before performing a search, a behavior that is also true for Nmslib. Now, imagine a scenario where a user configures an S3-backed directory in OpenSearch. Due to this way FAISS operates, the S3-backed directory would need to download the requested vector index from S3, which could significantly worsen the p99 query time, as KNNWeight lazily loads the index (Code). As a result, query execution would be delayed until the entire vector index has been fully downloaded from S3 before the search can begin.
3. Problem Definitions
3.1. Problem We Are Solving
_To enable compatibility with various Directory implementations, we need to decouple from FSDirectory and make it extensible in OpenSearch._
Current implementation in OpenSearch is assuming a vector index exists in normal file system (ex: ext4), passing the absolute path of the vector index to underlying native engines. For example, in FAISS, it would end up invoking
Index* read_index(const char* fname, int io_flags = 0)
method, and in Nmslib,void LoadIndex(const string& location)
method will be called eventually. In which, both native engines will try to read bytes and load the entire index into physical memory. Although there seems to be better strategies of loading index — lazy loading or mmap loading etc — but as we aligned in 2. Scope, we will not attempt to alter the philosophy of theirs.FSDirectory is the only Directory supported in OpenSearch. Due to this tight coupling, many network-based implementations (such as S3-backed or NFS-based Directories) and potential future Directory implementations cannot be integrated with native vector search engines in OpenSearch.
3.2. [Optional] Lucene Directory Design
Let’s briefly review Lucene’s Directory design before we continue with the discussion. If you're already familiar with its design and functionalities, feel free to skip ahead to the next section — 4. Requirements
This overview is included here rather than in the appendix, as it provides essential background before delving into the proposal.
3.2.1. Directory
Directory represents a logical file system with files organized in a tree structure, allowing users to perform CRUD operations (Create, Read, Update, Delete) through its APIs. The underlying Directory implementation must not only support creation and deletion but also provide an IndexInput stream, enabling the caller to read content from a specific offset.
For now, think of IndexInput as a random access interface, which we will revisit in section 3.2.3. IndexInput.
Renown Directory Implementations
3.2.2. DataInput
DataInput provides sequential read APIs and serves as the base class for IndexInput, which will be covered in the next section. Each DataInput implementation is expected to internally track the last read offset. However, DataInput itself does not offer an API for updating this offset. To modify the offset, users must inherit from DataInput and define their own IndexInput class.
3.2.3. IndexInput
IndexInput inherits from DataInput and includes an API for resetting the offset, in addition to all the features provided by DataInput. The caller can use this API to update the internal offset. Once updated, all read operations in DataInput will start from the new reset offset.
Directory provides an IndexInput as the read stream. While loading a vector index into physical memory typically involves sequential reading and does not necessitate random access, we will use IndexInput for sequential reading, as it is the type returned by the Directory.
3.2.4. DataOutput
An abstract base class for performing write operations on bytes. This serves as the foundation for the IndexOutput class, which will be discussed in detail in the next section. Each DataOutput implementation must internally track the next byte offset for writing.
3.2.5. IndexOutput
IndexOutput inherits from the DataOutput class with extra getter methods that return the internal offset where the next byte will be written. It provides two APIs: 1. A basic getter method. 2. An aligned offset adjustment method.
Note that the aligned offset method appends dummy bytes to ensure the offset is a multiple of the given alignment. For example, if the current offset is 121 and the required alignment is 8, the method will append 7 dummy bytes to adjust the offset to 128, which is a multiple of 8.
4. Requirements
4.1. Functional Requirements
4.2. Non-functional Requirements.
5. Solution Proposal
5.1. High Level Overview
5.1.1. Loading Vector Index (FAISS only)
[Image: Image.jpg]
5.1.2. Constructing Vector Index (FAISS only)
[Image: Image.jpg]
5.2. Low Level Details - FAISS
5.2.1. [Reading] Vector Index Loading Low Level Details
5.2.1.1. Define C++ Mediator.
The mediator component is responsible for invoking the IndexInput instance to obtain bytes and then copying them into the specified memory location in C++ (e.g., performing a Java-to-C++ byte copy).
5.2.1.2. NativeMemoryEntryContext.IndexEntryContext
IndexEntryContext contains essential information for loading the vector index. The current implementation only includes the logical index path, which it uses to construct a physical absolute path in the file system for access. The constructor will be updated to include an additional parameter, Directory, which will serve as the source of IndexInput.
5.2.1.3. NativeMemoryLoadStrategy.IndexLoadStrategy
IndexLoadStrategy now falls back to the baseline approach if the given Directory is file-based. Otherwise, it allows the native engine to fetch bytes from IndexInput. This change helps prevent potential performance degradation due to JNI call overheads and redundant byte copying.
5.2.1.4 File Watcher Integration
Whenever NativeMemoryLoadStrategy delegates a task of loading an index to native engines, it attaches a monitor object to remove the corresponding entry from the cached map managed by NativeMemoryCacheManager when a vector file is removed from the Directory. (Code) This behavior will remain unchanged even after extended the current implementation to pass IndexInput to native engines. The cached pair in the map will continue to be properly removed and cleaned up as before.
5.2.1.5. JNIService
JNIService serves as the entry point for interacting with the underlying native engines. Similar to how the current implementation passes the Java string value of the index path, it will now pass the reference to the provided IndexInput.
5.2.1.6. FaissService, Glue Component
The glue component is responsible for creating an adapter for
[IOReader](https://github.com/facebookresearch/faiss/blob/924c24db23b00053fc1c49e67d8787f0a3460ceb/faiss/impl/io.h#L27)
and passing it to the FAISS API. It first creates aNativeEngineIndexInputMediator
on the local stack, then wraps it with aFaissMediatorWrapper
.5.2.2. [Writing] Constructing Vector Index Low Level Details
By the time a vector index is requested to be written to underlying storage, the vector graph structure should already be properly trained and reside in memory. Below low-level details are intended to abstract away the IO processing logic, making the persistence of data in a file system seamless and transparent. As a result, the vector transfer processing logic will remain unchanged, even after the proposed introduction of an intermediate layer in native engines.
5.2.2.1. Define IndexOutput wrapper
5.2.2.2. Define C++ mediator
To minimize frequent context switching between C++ and Java, the writer mediator first attempts to retain bytes in its buffer. Once the buffer is full, it copies the bytes (e.g. uint8_t[]) to a Java buffer (e.g. byte[]) and then triggers
IndexOutputWithBuffer
to flush the bytes via the underlying IndexOutput.As a result, it needs double the memory copies compared to the baseline. (Copying bytes in C++ first, and then a second copy to Java byte[]). But I don’t believe this will significantly impact performance, as glibc’s memcpy typically achieves throughput between 10-30GB/sec. At the worst case, this would likely add only a few seconds to the process of building a vector index. Most of the performance degradation will be coming from Lucene’s IndexOutput implementation.
For the performance, please refer to 7.1. Analylsis
[Image: Image.jpg]
5.2.2.3. Expanding IndexService
5.3. Miscellaneous
Since we will be fully relying on Lucene’s
Directory
, few DTO classes now need to includeDirectory
instance as a member field.Also, a few classes should be modified to use the passed
Directory
instance process for building and loading vector indices instead of using the castedFSDirectory
.5.4. Pros and Cons
5.4.1. Pros
5.4.2. Cons
5.5. [P1] Nmslib - Introducing Similar Concept of IOReader in FAISS.
We need to implement some changes in Nmslib to make layers available. Since only two index types are being used currently —Hnsw and Hnsw— the definition of Hnsw is the only place requiring modification.
Currently, Hnsw has methods that accept
std::istream
andstd::ostream
for reading and writing bytes. However, these methods are private, which prevents JNI from passing a stream object and subsequently let it utilize Lucene’sIndexInput
andIndexOutput
for IO operations.5.5.1. Required Patches [DONE]
5.5.1.1. Loading With Stream
5.5.1.2. Writing With Stream
5.5.2. Read Stream Buffer
5.5.3. Write Stream Buffer
6. Backward Compatibility And Miscellaneous Performance Issues
It should fall back to the existing implementation when the given Directory is file-based, ensuring no backward compatibility issues. Apart from the inherent overhead of IndexInput (note that we cannot prevent users from importing inefficient IndexInput implementations in OpenSearch!), what are the costs of the proposed solution?
From a performance perspective, the primary impact will come from context switching between Java and C++ due to JNI calls. However, since each JNI call transition typically takes only nanoseconds, the overall performance degradation is expected to be minimal. Thus, the performance impact from JNI calls is likely to be negligible.
From a memory consumption perspective, additional memory allocation will be limited to a constant factor, with a maximum of approximately 4KB for the copy buffer. Additionally, because GetPrimitiveArrayCritical provides a pointer to the primitive array without performing a data copy in OpenJDK, we don’t need extra memory allocations other than 4K buffer.
6.1. File Watching Mechanism Issue
With this change, we need to modify the file-watching mechanism, as we are no longer relying on OS-level files. The purpose of the file watcher is to allow us to evict outdated vector indexes as soon as the corresponding file is deleted. This ensures we maintain the necessary memory space efficiently.
I realized that including this section would make the document overly lengthy, so I will move this topic to a separate sub-document where all alternatives will be explored in detail.
7. Performance Benchmark
7.1. Benchmark Environment
7.1.1. Traffic Loader
7.1.2. OpenSearch Engine
vm.max_map_count=262144
7.1. Analylsis
For the details, please refer to Appendix 1. Performance Details.
The best benchmark result for each case was selected after three trials. Overall search performance remained consistent, with the baseline occasionally outperforming the candidate and vice versa. This variation is expected, as the loading layer merely involves reading bytes into memory. Since the query process operates on in-memory data, introducing this layer should not significantly impact search performance.
We can see at the worst -20% performance degradation in bulk ingestion, resulted in 2 seconds added latency. And this is some what expected in As a result, it needs double the memory copies compared to the baseline. (Copying bytes in C++ first, and then…
Milestones
Prepare the next round of design meeting for write part.
POC (Reading part only)
Product-ionize (Reading part only)
Propose an introduction of loading layer in Nmslib.
9. Demo : S3 Vector Index Snapshot
[Image: Image.jpg]
9.1. Demo Steps
For the details commands, please refer to Appendix 2. Demo Scripts.
Appendix 1. Performance Details
Appendix 2. Loading Time Comparison
The numbers below were measured through
time curl -X GET http://localhost:9200/_plugins/_knn/warmup/target_index
. I made two different experiments of loading a FAISS vector index with different buffer sizes.sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches'
)Conclusion: Buffer size in
InputIndexWithBuffer
does not impact loading time. Then there's no reason to use more than 4KB buffer. If anything, it cost more space and takes more time between JNI critical section.When a new index file was just created, and it's not system cached, then there's no trivial different to loading time between baseline and streaming fashion. But the file is already loaded in system cache, then baseline (e.g. the one using
fread
) is slightly faster than streaming fashion. (3.584 VS 4.664). But considering such case is rare (except for rebooting an engine during rolling restart), and it is expected that most cases it would load a newly created vector index, I think it would not seriously deteriorate performance overall. Once an index was loaded, then query would be processed against to in-memory data structure, therefore there wasn't search performance between baseline versus streaming version. (Refer to above table for more details).Experiment
Index size : 6.4G
1. Baseline (Using fread)
2. Using Stream
2.1. 4KB
2.2. 64KB
2.3. 1M
Appendix 3. Demo Scripts
0. Set up AWS credential
./bin/opensearch-keystore create
./bin/opensearch-keystore add s3.client.default.access_key
./bin/opensearch-keystore add s3.client.default.secret_key
1. Make sure S3 bucket is empty.
https://us-east-1.console.aws.amazon.com/s3/buckets/kdooyong-opensearch?region=us-east-1&bucketType=general&tab=objects
2. Create an index.
3. Bulk ingest vector data.
4. Run a query and make sure we are getting a valid result.
5. Create a repository.
6. Look up the repository we created.
7. Take the snapshot
8. Get the snapshot info.
9. Delete the index from OpenSearch. Confirm that we deleted the index in OpenSearch.
10. Confirm we don’t have any indices
11. Restore the searchable index. Confirm that now we have a vector index restored.
12. Run a query against the restored vector index. Make sure we are getting a valid result.