Open vikasvb90 opened 6 months ago
Great porposal @vikasvb90 !
Impact on search latencies: Life long impact of increased search latencies during concurrent execution of id based operations because of the fact that all id based requests are routed to all shards against being routed to single shard. With growth in number of shards, latencies increase drastically. With 2 shards search latencies are 20% higher when concurrent id based ops are being executed, with 4 shards 75%, with 8 shards 200% and with 16 shards 150%.
Curious, with higher number of shards we see better search performance becuase of concurrency, how come is it different here ?
Handling of Lucene's 2 billion doc limit breach: Since existing shards are not mutated, no further upserts can happen once it reaches 2 billion doc count limit (including deleted docs count).
This is per segment level right ? Not per shard.
On a high level, both of the approaches will re-index the unlying lucene data structure. How is it different than doing below ?
Now in step -3, we need some modification in re-index API to create newer segment non-searchable until re-index finishes. Probably we can just copy the segments modifying their segment info with new shard details, we can save re-indexing the data as well here doing like this.
Curious, with higher number of shards we see better search performance becuase of concurrency, how come is it different here
In shard expansion approach, since document ids are not visible to customers, search requests are routed to all shards instead of just one shard because coordinator won't be able to determine right shard based on the doc id. Hence, significantly more IO and compute leading to higher search latencies.
This is per segment level right ? Not per shard.
No, this limit is imposed at Lucene Index i.e. an index shard in OS terms.
On a high level, both of the approaches will re-index the unlying lucene data structure.
Not true. Re-indexing isn't part of any of the proposed options above. In in-place physical split, no re-indexing is done and as I mentioned in section High level View of Split process
, delete by query is executed on segments of latest commit of the shard. And in shard expansion, en empty shard is brought up in the same shard key space and old shard data isn't mutated.
Will it be helpful to see if making segments grouped on the routing partition which a shard caters (e.g. if a shard caters to 0-128, we can have 4
groups of {[0,32),[32,64), [64,96), [96,128)} which will make the first log(num_groups) i.e. 2 in this example
splits very efficient on shards with respect to handling deletions of documents after split as it will purge the entire segments? New segments created within the child shard can continue to apply this invariant with respect to the shard routing range to ensure that splits are optimized for at least half the data as well going forward
@RS146BIJAY has an RFC around grouping the data in segments https://github.com/opensearch-project/OpenSearch/issues/13183 which we are discussing and was thinking how applicable it may be for such scenarios (but seems to optimize data storage for supporting splits)
@mgodwan I agree. Ability to group segments based on partitions would be quite helpful as this will significantly minimize the impact of split due to deleted documents. Due to the impact of deleted docs, split operation may need to be planned but with this feature split can even be carried out based on signals and dynamic scaling solutions can be built on top.
Forking in-place on a live index sounds extremely challenging if you want to do it with a single IndexWriter
. Essentially, you would need to maintain two distinct index commit histories, which Lucene makes very difficult. (I initially tried a forked merge history for merge on commit.) Would the idea be to start a new smaller shard (or pair of smaller shards) and write updates to both the "big" shard and the smaller shard, then copy over segments from the big shard? (Using the addIndexes
API on the small shard using a FilterCodec
that hides the "other" documents could copy things over pretty quickly and avoid producing a bunch of deletes that need to get merged away.)
The approach that I've implemented in the past (12 years ago, we built a Lucene-based search service internal to Amazon that supported live resharding) essentially involved an upstream transaction log (conceptually similar to Kafka or Kinesis). We would spin up two smaller shards (each covering half of the hash range of the earlier shard) that would restore a snapshot of the big shard, each delete half the docs, then they'd get caught up on the "live" updates by reading from the translog.
@msfroh Thanks for the comment! Idea is not to use the same IndexWriter
but it is actually along the same lines of the approach you used for live-resharding. Instead of restore, segments of source shard will be hardlinked in the directories of child shards, respective IndexWriters will be opened, documents not belonging to the child shards will be deleted (to be merged away later) and then we will perform a recovery based on peer-recovery flow to bring child shards in-sync with parent. Shard split mechanism is already present in OpenSearch - ShardSplittingQuery. Most of the work will be around online recovery of split shards i.e., while handling live traffic.
I am almost done with the design and going to publish it in a day or two. I will look forward to hear your thoughts on the same.
Problem Statement
In search use cases where entire data is supposed to be hot i.e. available for search throughout their lifetime, it is hard to predict the no. of primary shards in advance. As a result, the OpenSearch cluster can potentially get into hot shard or large shard problem where storage capacity of the node on which shard is hosted becomes insufficient to host the shard or Lucene’s hard limit of number of docs in a lucene index kicks in. This generally happens in use cases where custom routing keys are used. Today, there are two options available for users to get out of this problem - reindex and _split API. With reindexing, entire data of the index is reindexed into a new index having more number of primaries which is known to be a very slow process and requires huge amount of additional compute and IO for search and indexing operations to re-index. With
_split
API, the index is first marked as read-only and then a split of all shards of the index is performed which causes write downtime for users. Also,_split
API doesn’t provide the granularity of splitting at shard level. So, a single hot shard cannot be scaled out independently. In addition to this, free disk space equal to the entire size of the index is required till the old index is deleted. Hence, both approaches have their own limitations. This proposal looks at scaling of shards holistically and proposes options to address the problems above.Requirements
Functional
Non Functional
Proposals
Note: Options below are just proposals and separate issue would be shared with the detailed high level design.
Option 1 - Shard Key Space Expansion
Each shard in OpenSearch is responsible for serving documents whose routing ids hash to the shard’s keyspace. In this approach, a new writable empty shard is created once threshold set against the shard is breached and old shard of the same key space is made read/update/delete only. New documents get ingested into new shard which can be hosted on same or different node as per the allocation policy. Searches for the shard’s keyspace are routed to all shards present in the keyspace. This also includes get by id, update by id and delete by id for which search queries need to be executed first to find out docs belonging to one or more shards in the keyspace followed by the respective id based operation. Since, shard skew problem is generally caused by custom doc ids, bulk calls containing custom doc ids would translate to search followed by bulk since doc ids first need to fetched to distinguish the operation as an update or an insert. To accommodate updates or deletes on data present in old shard, an additional buffer storage required for deletion and merges of any or all docs of the shard is reserved.
Pros
Cons
[Preferred] Option 2 - In-Place Physical Shard Split
In this approach, shard data is split into child shards depending on the child hash ranges. There are two parts of this design
Pros
Cons
Offline _split API vs In-Place physical shard split
ShardSplitttingQuery
and build an in-place recovery approach on top to recover child shards against the source shard.In-place Split API
We have 2 options for this:
_split API
with more optionsThe preference would be to go with Option 2 and create a new API. There are inherently different where current
_split
creates a new index vs new api will perform in-place split of a shard of an index. So, keeping them under single API will make it confusing. Please share your opinion on the preferred option for in-place split API.POC conducted for in-place physical shard split
Following are some of the observations based on the benchmark carried out:
How Can You Help?
Any general comments about the overall direction are welcome. At this point we believe that option 2 fits more into solving the listed problems. Some specific questions:
Next Steps
We will incorporate the feedback from this RFC into a more detailed proposal/high level design. We will then create meta issues to go in more depth on the components involved to continue the detailed design.
Appendix
Latencies and throughput of In-Place split
Latencies and throughput of new child shards created in in-place split will eventually be at par with latencies and throughput of new index because of the fact that split performs delete by query in the background to remove documents from child shards which do not belong to them and due to rise in the number of deleted documents on shards, impact on search latency and hence, on throughput can be seen. Once deleted documents are merged away by the tiered merge policy, search latencies come back to normal state.
Routing Logic
In OpenSearch, routing of a document having a custom id happens via a 32-bit murmur3 hash function which generates a 32 bit hash which is then used to compute the shard id by using the below logic:
Benchmark Setup
Used 3 r5.2xlarge instances.
Index with 24 shards and 1 replica. Number of clients used = 8.
Downloaded 1.4gb document json file of http_logs workload consisting 11961342 docs.
Created a custom workload with data of following type which had a
custom_id
field which was later used to perform update_by_query benchmarks. Value of fieldcustom_id
was kept same as_id
field. During workload creation, value of this field was set to 1 as follows:Executed a custom update_by_id workload having entries as follows to update
custom_id
field.Custom workload to update custom_id field was only executed for 1000001 docs. This was done to prevent OSB eventually running into OOM error which happens when it reaches a certain threshold of requests. This is probably happening due to build up of large number of stats proportional to requests in memory.
At the end of the run, document count where custom_id wasn’t equal to 1 was 1000001 as we intended.
Deleted remaining docs by performing a delete_by_query on docs where custom_id was not 1.
As a result, an index was produced which consisted of doc of following type where
_id
field could be used to benchmarkupdate_by_id
use case andcustom_id
could be used to benchmarkupdate_by_query
use case.Benchmark Results
Impact on search latencies due to the overhead of deleted docs after split
Cluster configuration : 3 m6g.large (2 vCPUs) nodes non-dedicated setup having 500gb disk space each. This benchmark uses OpenSearch’s existing offline index split to measure the impact on search.
Benchmark before split
Benchmark after split
Thanks a lot @shwetathareja for the valuable insights!