opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.43k stars 1.72k forks source link

[META] Protobuf for Search API #10684

Open VachaShah opened 10 months ago

VachaShah commented 10 months ago

Proposal

With the experiment done for using protobuf in _cat/nodes API (see https://github.com/opensearch-project/OpenSearch/issues/6844#issuecomment-1742250229) and a 15-30% improvement depending on the size of the cluster, we can assess the benefits of protobuf for Search API in terms of serialization and de-serialization of the requests and responses in between nodes and at the REST level.

Next Steps

Do a similar experiment for the Search API to understand the performance improvements using protobuf. (This is a work in progress)

Details

In order to experiment and see incremental benefits, some of the classes I am targeting to convert to protobuf: SearchRequest, SearchResponse, SearchPhaseResult, FetchSearchResult, ShardFetchRequest, ShardSearchRequest, QueryFetchSearchResult, QuerySearchRequest, QuerySearchResult. Also, classes related to TransportSearchAction are required to support protobuf requests and responses.

Code changes

The code changes for the experiment are being added in a branch on my fork: https://github.com/VachaShah/OpenSearch/compare/poc-cat-nodes-protobuf...VachaShah:OpenSearch:poc-search-protobuf?expand=1

Sub tasks/Milestones

Next steps as discussed with @msfroh and @getsaurabh02. Let me know if I missed something here.

Note: All of the above changes go behind an experimental feature flag. Once the incremental changes are in for the Search API:

Related

navneet1v commented 10 months ago

would like to do +1 on this feature. This will be great win specially for vector search where search payload increases with increase in dimension of the vectors.

saratvemulapalli commented 10 months ago

would like to do +1 on this feature. This will be great win specially for vector search where search payload increases with increase in dimension of the vectors.

@navneet1v is there data we could look where the bottleneck is for vector search?

navneet1v commented 10 months ago

@saratvemulapalli I don't have data available, because while running the benchmarks we remove the _source field. But what I can do is I can run a small benchmarks and provide you that info. Will that work?

saratvemulapalli commented 10 months ago

@saratvemulapalli I don't have data available, because while running the benchmarks we remove the _source field. But what I can do is I can run a small benchmarks and provide you that info. Will that work?

Yeah what we'd like to know is serialization/de-serialization causing performance latency, and is that during query phase or fetch phase etc. This will help us narrow down which area to work on first. The idea so far (theoretically) is we will work on fetch phase where the responses are serialized on data nodes and de-serialized on co-ordinator which will have the best benefits for protobuf.

navneet1v commented 10 months ago

For vector search it will be fetch phase only, because that is where a vector having like lets say 100 dimension is getting serialized and de-serialized per document. Consider like every float is represented as 4bytes it becomes like 400 bytes just alone for vectors.

and the same gets transported over the wire to customers too. Hence once we start to make the change it will help a lot.

The idea so far (theoretically) is we will work on fetch phase where the responses are serialized on data nodes and de-serialized on co-ordinator which will have the best benefits for protobuf.

+1 on this.

From benchmarks I will see what I can provide, but I can surely help setup the benchmark code so that you guys can run of different custom OpenSearch to get more numbers if you want.

VachaShah commented 10 months ago

That would be super helpful @navneet1v!

navneet1v commented 10 months ago

@saratvemulapalli , @VachaShah Here is one of the benchmarking notebook which you can use, to test k-NN with any OpenSearch cluster with Security Enabled.

https://github.com/navneet1v/OpenSearchVectorDB/blob/main/benchmarking/sift-128/sift-128-benchmarking.ipynb

This is something I was working on. This replicates the behavior on how we do perf testing in K-NN. Good thing about this is its is easy to run.

import numpy as np
from tqdm.notebook import tqdm

# search in the index
def searchQueryGen(input_array=X_TEST):
    for i, vec in enumerate(input_array):
        yield {
            "_source": False, # Don't get the source as this impacts latency
            "size": 100,
            "query": {
                "knn": {
                    "vec": {
                        "vector": vec.tolist(),
                        "k": 100
                    }
                }
            }
        }

neighbors_lists = []
search_latency = []
took_time = []
for query in tqdm(searchQueryGen(), total=len(X_TEST)):
    start = time.time()
    search_response = client.search(body=query, index=vector_index_name, _source=False, docvalue_fields=["_id"], stored_fields="_none_")
    end = time.time()
    search_latency.append(end - start)
    took_time.append(search_response["took"])
    search_hits = search_response['hits']['hits']
    search_neighbors = [int(hit["fields"]["_id"][0]) for hit in search_hits]
    neighbors_lists.append(search_neighbors)

You can remove all the optimization that we have added:

  1. not getting _source for documents.
  2. not getting stored fields.
  3. Getting _id as doc value rather than from stored field.

Please let me know if you need any more details happy to help.

getsaurabh02 commented 10 months ago

This is super exciting @VachaShah!! Would you like to share insights on what Transport classes are good candidates for trying out and experimenting the improvements first. Do you think instrumenting the node-to-node interaction for Query and Fetch phases, especially targeting implementations for SearchPhaseResult could be a good idea?

This could include heavily exercised code paths during query executions such as QuerySearchResult, FetchSearchResult, ScrollQuerySearchResult and more.

VachaShah commented 10 months ago

Thank you @getsaurabh02! I have added the classes that I am targeting first in the issue description. They include the Query and Fetch phase implementations of SearchPhaseResult and other related request, response and transport action classes.

VachaShah commented 9 months ago

The code for end-to-end working POC with _search_protobuf API which is a version of _search API with requests, responses and node-to-node communication using protobuf is built on top of _cat/nodes API POC. The code can be found in this diff: https://github.com/VachaShah/OpenSearch/compare/poc-cat-nodes-protobuf...VachaShah:OpenSearch:poc-search-protobuf?expand=1.

The current POC is for QUERY_THEN_FETCH search types with some embedded objects in the Response as bytes. The next step is to convert those embedded objects into proto messages as well.

I am going to micro benchmarking the protobuf integrated API to compare with the original search API.

Next steps

hdhalter commented 9 months ago

@VachaShah - Will this require documentation? If so, can you please create a doc issue, let me know who will be submitting the doc PR, and add it to the unified tracker project? Thank you!

VachaShah commented 8 months ago

@VachaShah - Will this require documentation? If so, can you please create a doc issue, let me know who will be submitting the doc PR, and add it to the unified tracker project? Thank you!

@hdhalter This does not require documentation as of now since it is a performance improvement. I am going to divide this meta issue into sub tasks and if any of them have a need for documentation, I will make sure to create a doc issue for those.

VachaShah commented 8 months ago

In order to divide this issue into deliverables, converting this issue into a meta issue. Sub tasks will be listed in the issue description.

saratvemulapalli commented 8 months ago

Thanks @VachaShah for breaking it down. Tagging @dbwiddis who is interested in contributing.

VachaShah commented 8 months ago

Benchmarks

The benchmarks are taken using opensearch-benchmark for both the original search API and protobuf version of the API for default searches in benchmarks. The workload used is nyc_taxis.

5 nodes cluster

Seeing a 19.1% decrease in latency for default search with protobuf integration.

Original API

Metric Task Value Unit
Cumulative indexing time of primary shards 0.00248333 min
Min cumulative indexing time across primary shards 0.00248333 min
Median cumulative indexing time across primary shards 0.00248333 min
Max cumulative indexing time across primary shards 0.00248333 min
Cumulative indexing throttle time of primary shards 0 min
Min cumulative indexing throttle time across primary shards 0 min
Median cumulative indexing throttle time across primary shards 0 min
Max cumulative indexing throttle time across primary shards 0 min
Cumulative merge time of primary shards 0 min
Cumulative merge count of primary shards 0
Min cumulative merge time across primary shards 0 min
Median cumulative merge time across primary shards 0 min
Max cumulative merge time across primary shards 0 min
Cumulative merge throttle time of primary shards 0 min
Min cumulative merge throttle time across primary shards 0 min
Median cumulative merge throttle time across primary shards 0 min
Max cumulative merge throttle time across primary shards 0 min
Cumulative refresh time of primary shards 0.00153333 min
Cumulative refresh count of primary shards 5
Min cumulative refresh time across primary shards 0.00153333 min
Median cumulative refresh time across primary shards 0.00153333 min
Max cumulative refresh time across primary shards 0.00153333 min
Cumulative flush time of primary shards 0 min
Cumulative flush count of primary shards 0
Min cumulative flush time across primary shards 0 min
Median cumulative flush time across primary shards 0 min
Max cumulative flush time across primary shards 0 min
Total Young Gen GC time 0.008 s
Total Young Gen GC count 1
Total Old Gen GC time 0 s
Total Old Gen GC count 0
Store size 0.000252848 GB
Translog size 5.12227e-08 GB
Heap used for segments 0 MB
Heap used for doc values 0 MB
Heap used for terms 0 MB
Heap used for norms 0 MB
Heap used for points 0 MB
Heap used for stored fields 0 MB
Segment count 8
Min Throughput index 26349.6 docs/s
Mean Throughput index 26349.6 docs/s
Median Throughput index 26349.6 docs/s
Max Throughput index 26349.6 docs/s
50th percentile latency index 31.8154 ms
100th percentile latency index 38.6998 ms
50th percentile service time index 31.8154 ms
100th percentile service time index 38.6998 ms
error rate index 0 %
Min Throughput wait-until-merges-finish 163.69 ops/s
Mean Throughput wait-until-merges-finish 163.69 ops/s
Median Throughput wait-until-merges-finish 163.69 ops/s
Max Throughput wait-until-merges-finish 163.69 ops/s
100th percentile latency wait-until-merges-finish 5.62406 ms
100th percentile service time wait-until-merges-finish 5.62406 ms
error rate wait-until-merges-finish 0 %
Min Throughput default 83.27 ops/s
Mean Throughput default 83.27 ops/s
Median Throughput default 83.27 ops/s
Max Throughput default 83.27 ops/s
100th percentile latency default 17.6042 ms
100th percentile service time default 5.37617 ms
error rate default 0 %
Min Throughput range 112.38 ops/s
Mean Throughput range 112.38 ops/s
Median Throughput range 112.38 ops/s
Max Throughput range 112.38 ops/s
100th percentile latency range 14.9054 ms
100th percentile service time range 5.7996 ms
error rate range 0 %

API with Protobuf

Metric Task Value Unit
Cumulative indexing time of primary shards 0.00171667 min
Min cumulative indexing time across primary shards 0.00171667 min
Median cumulative indexing time across primary shards 0.00171667 min
Max cumulative indexing time across primary shards 0.00171667 min
Cumulative indexing throttle time of primary shards 0 min
Min cumulative indexing throttle time across primary shards 0 min
Median cumulative indexing throttle time across primary shards 0 min
Max cumulative indexing throttle time across primary shards 0 min
Cumulative merge time of primary shards 0 min
Cumulative merge count of primary shards 0
Min cumulative merge time across primary shards 0 min
Median cumulative merge time across primary shards 0 min
Max cumulative merge time across primary shards 0 min
Cumulative merge throttle time of primary shards 0 min
Min cumulative merge throttle time across primary shards 0 min
Median cumulative merge throttle time across primary shards 0 min
Max cumulative merge throttle time across primary shards 0 min
Cumulative refresh time of primary shards 0.001 min
Cumulative refresh count of primary shards 5
Min cumulative refresh time across primary shards 0.001 min
Median cumulative refresh time across primary shards 0.001 min
Max cumulative refresh time across primary shards 0.001 min
Cumulative flush time of primary shards 0 min
Cumulative flush count of primary shards 0
Min cumulative flush time across primary shards 0 min
Median cumulative flush time across primary shards 0 min
Max cumulative flush time across primary shards 0 min
Total Young Gen GC time 0.004 s
Total Young Gen GC count 1
Total Old Gen GC time 0 s
Total Old Gen GC count 0
Store size 0.000253422 GB
Translog size 5.12227e-08 GB
Heap used for segments 0 MB
Heap used for doc values 0 MB
Heap used for terms 0 MB
Heap used for norms 0 MB
Heap used for points 0 MB
Heap used for stored fields 0 MB
Segment count 8
Min Throughput index 31452.4 docs/s
Mean Throughput index 31452.4 docs/s
Median Throughput index 31452.4 docs/s
Max Throughput index 31452.4 docs/s
50th percentile latency index 25.6299 ms
100th percentile latency index 26.2214 ms
50th percentile service time index 25.6299 ms
100th percentile service time index 26.2214 ms
error rate index 0 %
Min Throughput wait-until-merges-finish 292.51 ops/s
Mean Throughput wait-until-merges-finish 292.51 ops/s
Median Throughput wait-until-merges-finish 292.51 ops/s
Max Throughput wait-until-merges-finish 292.51 ops/s
100th percentile latency wait-until-merges-finish 3.01014 ms
100th percentile service time wait-until-merges-finish 3.01014 ms
error rate wait-until-merges-finish 0 %
Min Throughput default 110.58 ops/s
Mean Throughput default 110.58 ops/s
Median Throughput default 110.58 ops/s
Max Throughput default 110.58 ops/s
100th percentile latency default 14.2405 ms
100th percentile service time default 3.97145 ms
error rate default 0 %
Min Throughput range 142.97 ops/s
Mean Throughput range 142.97 ops/s
Median Throughput range 142.97 ops/s
Max Throughput range 142.97 ops/s
100th percentile latency range 11.715 ms
100th percentile service time range 4.50792 ms
error rate range 0 %

10 nodes cluster

Seeing a 23.03% decrease in latency for default search with protobuf integration.

Original API

Metric Task Value Unit
Cumulative indexing time of primary shards 0.00198333 min
Min cumulative indexing time across primary shards 0.00198333 min
Median cumulative indexing time across primary shards 0.00198333 min
Max cumulative indexing time across primary shards 0.00198333 min
Cumulative indexing throttle time of primary shards 0 min
Min cumulative indexing throttle time across primary shards 0 min
Median cumulative indexing throttle time across primary shards 0 min
Max cumulative indexing throttle time across primary shards 0 min
Cumulative merge time of primary shards 0 min
Cumulative merge count of primary shards 0
Min cumulative merge time across primary shards 0 min
Median cumulative merge time across primary shards 0 min
Max cumulative merge time across primary shards 0 min
Cumulative merge throttle time of primary shards 0 min
Min cumulative merge throttle time across primary shards 0 min
Median cumulative merge throttle time across primary shards 0 min
Max cumulative merge throttle time across primary shards 0 min
Cumulative refresh time of primary shards 0.00126667 min
Cumulative refresh count of primary shards 5
Min cumulative refresh time across primary shards 0.00126667 min
Median cumulative refresh time across primary shards 0.00126667 min
Max cumulative refresh time across primary shards 0.00126667 min
Cumulative flush time of primary shards 0 min
Cumulative flush count of primary shards 0
Min cumulative flush time across primary shards 0 min
Median cumulative flush time across primary shards 0 min
Max cumulative flush time across primary shards 0 min
Total Young Gen GC time 0.007 s
Total Young Gen GC count 1
Total Old Gen GC time 0 s
Total Old Gen GC count 0
Store size 0.000223138 GB
Translog size 5.12227e-08 GB
Heap used for segments 0 MB
Heap used for doc values 0 MB
Heap used for terms 0 MB
Heap used for norms 0 MB
Heap used for points 0 MB
Heap used for stored fields 0 MB
Segment count 5
Min Throughput index 24834.7 docs/s
Mean Throughput index 24834.7 docs/s
Median Throughput index 24834.7 docs/s
Max Throughput index 24834.7 docs/s
50th percentile latency index 41.3685 ms
100th percentile latency index 45.1308 ms
50th percentile service time index 41.3685 ms
100th percentile service time index 45.1308 ms
error rate index 0 %
Min Throughput wait-until-merges-finish 167.27 ops/s
Mean Throughput wait-until-merges-finish 167.27 ops/s
Median Throughput wait-until-merges-finish 167.27 ops/s
Max Throughput wait-until-merges-finish 167.27 ops/s
100th percentile latency wait-until-merges-finish 5.51836 ms
100th percentile service time wait-until-merges-finish 5.51836 ms
error rate wait-until-merges-finish 0 %
Min Throughput default 90.29 ops/s
Mean Throughput default 90.29 ops/s
Median Throughput default 90.29 ops/s
Max Throughput default 90.29 ops/s
100th percentile latency default 18.8456 ms
100th percentile service time default 6.55805 ms
error rate default 0 %
Min Throughput range 100.27 ops/s
Mean Throughput range 100.27 ops/s
Median Throughput range 100.27 ops/s
Max Throughput range 100.27 ops/s
100th percentile latency range 19.1147 ms
100th percentile service time range 8.94176 ms
error rate range 0 %

API with protobuf

Metric Task Value Unit
Cumulative indexing time of primary shards 0.00223333 min
Min cumulative indexing time across primary shards 0.00223333 min
Median cumulative indexing time across primary shards 0.00223333 min
Max cumulative indexing time across primary shards 0.00223333 min
Cumulative indexing throttle time of primary shards 0 min
Min cumulative indexing throttle time across primary shards 0 min
Median cumulative indexing throttle time across primary shards 0 min
Max cumulative indexing throttle time across primary shards 0 min
Cumulative merge time of primary shards 0 min
Cumulative merge count of primary shards 0
Min cumulative merge time across primary shards 0 min
Median cumulative merge time across primary shards 0 min
Max cumulative merge time across primary shards 0 min
Cumulative merge throttle time of primary shards 0 min
Min cumulative merge throttle time across primary shards 0 min
Median cumulative merge throttle time across primary shards 0 min
Max cumulative merge throttle time across primary shards 0 min
Cumulative refresh time of primary shards 0.00111667 min
Cumulative refresh count of primary shards 5
Min cumulative refresh time across primary shards 0.00111667 min
Median cumulative refresh time across primary shards 0.00111667 min
Max cumulative refresh time across primary shards 0.00111667 min
Cumulative flush time of primary shards 0 min
Cumulative flush count of primary shards 0
Min cumulative flush time across primary shards 0 min
Median cumulative flush time across primary shards 0 min
Max cumulative flush time across primary shards 0 min
Total Young Gen GC time 0 s
Total Young Gen GC count 0
Total Old Gen GC time 0 s
Total Old Gen GC count 0
Store size 0.000251911 GB
Translog size 5.12227e-08 GB
Heap used for segments 0 MB
Heap used for doc values 0 MB
Heap used for terms 0 MB
Heap used for norms 0 MB
Heap used for points 0 MB
Heap used for stored fields 0 MB
Segment count 8
Min Throughput index 27541 docs/s
Mean Throughput index 27541 docs/s
Median Throughput index 27541 docs/s
Max Throughput index 27541 docs/s
50th percentile latency index 30.1443 ms
100th percentile latency index 31.1228 ms
50th percentile service time index 30.1443 ms
100th percentile service time index 31.1228 ms
error rate index 0 %
Min Throughput wait-until-merges-finish 221.57 ops/s
Mean Throughput wait-until-merges-finish 221.57 ops/s
Median Throughput wait-until-merges-finish 221.57 ops/s
Max Throughput wait-until-merges-finish 221.57 ops/s
100th percentile latency wait-until-merges-finish 3.96129 ms
100th percentile service time wait-until-merges-finish 3.96129 ms
error rate wait-until-merges-finish 0 %
Min Throughput default 138.62 ops/s
Mean Throughput default 138.62 ops/s
Median Throughput default 138.62 ops/s
Max Throughput default 138.62 ops/s
100th percentile latency default 15.7351 ms
100th percentile service time default 5.31692 ms
error rate default 0 %
Min Throughput range 117.22 ops/s
Mean Throughput range 117.22 ops/s
Median Throughput range 117.22 ops/s
Max Throughput range 117.22 ops/s
100th percentile latency range 15.5039 ms
100th percentile service time range 6.77485 ms
error rate range 0 %
Bukhtawar commented 8 months ago

Just a quick note, please also benchmark/profile for CPU and JVM overhead reduction with the change during ser/de.

backslasht commented 8 months ago

@VachaShah - 100th percentile is generally skewed, 90th percentile is more reliable, can you please share the numbers for 90th and 99th percentile. Also, are these single runs or average of 'N' runs?

VachaShah commented 8 months ago

@VachaShah - 100th percentile is generally skewed, 90th percentile is more reliable, can you please share the numbers for 90th and 99th percentile. Also, are these single runs or average of 'N' runs?

OSB runs 100 iterations for search, so this is average for those 100 runs. OSB publishes the 100th percentile for the operations, I think the OSB code needs to be modified to get the other percentiles. @gkamat Is there a way to customize this from command line?

VachaShah commented 8 months ago

CPU Utilization

I analyzed the CPU used when running benchmarks and with async-profiler.

CPU used during search

async-profiler CPU profile

Default Search

Screenshot 2024-01-10 at 10 57 35 PM

Protobuf Search

Screenshot 2024-01-10 at 10 58 40 PM
kiranprakash154 commented 7 months ago

Hi, are we on track for this to be released in 2.12 ?

getsaurabh02 commented 7 months ago

Given we are trying to build some cleaner abstractions around detecting the protocol and making it pluggable through some refactoring, this will need some more time and I believe the code will be in a good state by 2.13

VachaShah commented 3 months ago

Update

With the discussion on using protobuf for API request/response and node-to-node communication in the transport layer, we have first taken up making the transport layer abstract to support multiple protocols for serialization and deserialization. This will decouple the node-to-node communication in the transport layer from the current serialization mechanism (which is now referred to as native protocol in the codebase).

After these changes, we will take up adding protobuf into the codebase for search API in a way that the API request/response layer is not tightly coupled with the serialization mechanisms (which is the case currently for example how Writeable is implemented by model classes and request/response classes).

Transport layer abstractions and decoupling

Introduction of protobuf for search API (WIP - might be divided into more PRs)