opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.76k stars 1.82k forks source link

[Feature Proposal] Merging Segments in Remote Store #8105

Open sachinpkale opened 1 year ago

sachinpkale commented 1 year ago

This RFC is still Work In Progress. I will keep updating it based on the result of the POCs that need to be performed to check some assumptions made in this doc.

Problem Statement

With Searchable snapshot and Searchable remote index features, OpenSearch has introduced a concept of data tiers: hot index will have all the data on remote as well as local disk, warm index will have entire data only on remote and part of it can be downloaded to local disk as per the requirement. With the proposed Writable remote index feature, OpenSearch will also support writing to warm indices. Based on the write pattern, it is possible to create many small-small segments for an index/shard. This would result in increased read latency and it would be proportional to number of segments. In order to have bounded read latency, we need to apply the same logic that OpenSearch/Lucene applies with the local data: merging the segments. As the entire data in remote store, we need to define an approach which will incrementally merge the segments with minimal impact on the OpenSearch cluster.

Requirements

Functional

Non-Functional

Background - Segment Merges in OpenSearch

Whenever the segments in an index have been altered by IndexWriter, IndexWriter invokes MergePolicy.findMerges to pick segments to be merged. OpenSearch uses OpenSearchTieredMergePolicy which is a wrapper over Lucene’s TieredMergePolicy. Overall idea of TieredMergePolicy is to merge segments of approximately equal size. The merge policy can return more than one merge at a time. In this case, if the writer is using SerialMergeScheduler, the merges will be run sequentially but if it is using ConcurrentMergeScheduler they will be run concurrently.

Javadoc for MergePolicyConfig provides good overview of existing segment merges in OpenSearch. A quick TL;DR from the Javadoc: “For normal merging, the policy first computes a "budget" of how many segments are allowed to be in the index. If the index is over-budget, then the policy sorts segments by decreasing size (proportionally considering percent deletes), and then finds the least-cost merge. Merge cost is measured by a combination of the "skew" of the merge (size of largest seg divided by smallest seg), total merge size and pct deletes reclaimed, so that merges with lower skew, smaller size and those reclaiming more deletes, are favored."

Following dynamic settings from MergePolicyConfig control segment merge:

  1. index.merge.policy.max_merge_at_once
  2. index.merge.policy.max_merged_segment
  3. index.merge.policy.segments_per_tier
  4. index.merge.policy.expunge_deletes_allowed
  5. index.merge.policy.floor_segment
  6. index.merge.policy.deletes_pct_allowed

Proposed Solution

Segment merge is a resource intensive (CPU and I/O) operation. An ideal solution to merge segments of the warm indices should make sure of predictable impact on CPU and I/O utilization. It also needs to consider the extra disk space required to download the segments to be merged.

The following solution explores automated way of segment merge. Most of the steps in the algorithm should remain same for manual mode which involves invoking _forcemerge API explicitly.

Algorithm

The solution will have following steps. For most of these steps, we have provided multiple options. It is possible that we will implement more than one option for a step and decide which option to use based on user’s input or OpenSearch cluster state at the time of merge.

  1. Segment Merge Trigger - Currently, for hot index, whenever the segments in an index are altered by IndexWriter, either the addition of a newly flushed segment, addition of many segments from addIndexes* calls, or a previous merge that may now need to cascade, IndexWriter invokes findMerges. We have following 2 options for warm index:
    1. The same trigger can be used for the segment merges of the warm index as well. This is with the assumption that when index is migrated from hot to warm, its segments were already in optimal state where no further merges are required. Any new writes to the warm index will result in new segments creation which would require merge check.
    2. If the assumption in above option does not hold true, until the next segment is created, the warm index will be in non-optimized state (too many segments) which will impact read performance. To avoid this, we need a periodic job checking segments state of each warm index and notify if merge is required. But based on interval of this periodic job, it is possible that a particular index remains un-optimized till the next job runs.
  2. Find segments to merge - TieredMergePolicy finds segments to merge based on configured settings which decide frequency of the merges, number of segments to merge, number of merged segments etc. These settings can have different values for hot and warm indices. For example, value of max_merge_at_once is 10 by default for hot index. As we need to download segments in order to merge them, value for warm index also need to take available disk space into account. The exact default value can be determined as we explore the solution in detail as part of low level design. One more thing to note: TieredMergePolicy uses segments_N file to get required info on existing segments. So, in this step, we don’t need to download/read individual segments. Reading latest segments_N file would be sufficient.
  3. Download/Read segments - Once we identify segments to be merged, we need to make them available for merge operation. There are following 3 options to read segments:
    1. Download the segment files to be merged on local disk and proceed with merge as it happens for local segments. Assuming average size of segments as A MB and total B segments to be merged, we need to download A*B MB of data which will consume disk size as well as disk write throughput.
    2. Block based fetch. We download blocks of X MB each time and proceed with the merge. It total number of segments to be merged is Y, at any time, we don’t need more than X * Y MB of data downloaded to local disk. Before we go to download next block, we can delete previously downloaded blocks. This way, required disk space to merge will be reduced and disk I/O will be staggered over time. This option is inspired from Searchable Snapshot design. Currently, this is theoretical and we need to perform a POC to check feasibility of this option.
    3. Read segments directly from remote store using RemoteDirectory. No need to download any data. This option will help reduce disk I/O as well as extra disk space requirement to download the segments but will be slower in comparison to merging segments that are present on disk. We also need to check feasibility of this option by performing a POC.
  4. Merge Operation - Actual merge operation gets executed in SegmentMerger.merge() which is called from IndexWriter. This step will not change this code flow. Rather, it decides on the MergeScheduler to be used. For hot indices, OpenSearch uses ConcurrentMergeScheduler. But as merges of warm indices need to be aware of resource consumption as well, we may go with SerialMergeScheduler in order to control number of parallel merges. So, we have two options in this step:
    1. ConcurrentMergeScheduler
    2. SerialMergeScheduler
  5. Upload merged segment and delete merged away segments - Once the merge completes, the merged segment should get uploaded to the remote segment store. For the merged away segments, we need to make sure that they will be deleted only after the ref count is 0.

Performance Impact

One of the important considerations of the merge operation of segments in remote store is extra resource consumption and its impact on performance. The problem occurs if this resource consumption is unbounded and starts impacting other critical flows of OpenSearch. As we can’t completely avoid this extra resource consumption, we need to find a way to either bound or isolate it.

For both the approaches mentioned below, overall idea is to introduce a new node role: merge. Node with this role will be responsible for merging segments from remote store by executing steps mentioned in the algorithm above. The name is merge and not remote_merge just to make it extensible for merge operation of hot indices as well (this is not in the scope of this RFC).

  1. Using separate fleet of merge nodes - In this approach, we create a set of nodes with a single role: merge. By performing segment merges on a different node, we isolate the performance impact on the data nodes. But it also adds an overhead to manage a separate fleet of nodes. merge nodes need to be in-sync with data nodes where search/write operations are being performed for the same index/shard. The low level details of working of merge nodes can be covered in the design document. This approach assumes that warm indices will be hosted on the data nodes. If we decide to use separate node role for warm indices (not in the scope of this RFC) and have a separate fleet for them, we already achieve resource isolation and may not need a separate merge nodes.
  2. Using existing data nodes with resource isolation - In this approach, we perform merge on the existing data nodes by attaching merge role to the set of data node in the cluster. We can control resource consumption to some extent by controlling MergePolicy configs and threadpool size. But even with a single thread execution, we won’t be able to control CPU, Memory, Disk consumption completely. For example, if we are running some expensive search queries which is already causing high CPU and memory usage and a segment merge is triggered. This would cause sudden spikes in the usage pattern that can cause performance impact as well as cluster stability issues. We need a complete resource isolation mechanism that isolates resource usage at much granular level like off-heap memory usage. This approach will sill be useful if the write operations on warm indices are less, resulting in less frequent merges as we don’t need to manage a separate set of merge nodes.

FAQs

  1. Will I be able to achieve different read performance for different warm indices?
    1. if we only consider number of segments, Read performance is proportional to number of segments for a shard. We can improve the performance by reducing the number of segments to 1. This can either be done by invoking _forcemerge API or by changing MergePolicy configs.
  2. Can I completely disable automated merging and perform manual merge in off-peak hours?
    1. We can achieve the same by deferring segment merges by changing MergePolicy configs like: segments_per_tier.
  3. Can this solution be extended to hot indices as well?
    1. Yes, the idea is to use merge nodes to handle all the merge operations. We need to introduce a priority queue based mechanism. If multiple merges are triggered at the same time, we need to give precedence to hot index.
BhumikaSaini-Amazon commented 1 year ago

Given that a new / existing node (which may not necessarily be the primary for the IndexShard that the segments belong to) may have the merge role now, could there be a clash between remote store-writes from a shard’s primary and those from a merge node? How will remote store-writes from a merge node supersede in this scenario?

andrross commented 1 year ago

Thanks @sachinpkale!

Segment Merge Trigger

I believe a hot index is always going to be in a state allowed by the merge policy, because if it was not then it would have performed a merge in order to become compliant with the policy. I think maybe what is implicit here is that when an index is moved to "warm", a different segment topology may be desired in order to prioritize reads. Is this what you mean by the index being in an optimal state? I'm thinking this is not specific to hot/warm or even remote store, i.e. if you have a normal hot/local index that transitions from read/write to predominantly read, then you might want a different merge policy to reduce the number of segments. I would lean towards solving this by using and/or expanding on the existing capabilities by say changing the merge policy of the index (when and where to perform any merges is an interesting question I'll touch on later).

Download/Read segments

I think this can be an implementation optimization of the warm storage layer. The layer in Lucene doing the merging operates via the Directory interface and shouldn't need to care about how the data is fetched. Also, when opening a file Lucene will pass an IOContext that will specify if it is being used for a merge. The warm storage layer can definitely be optimized here, namely to avoid doing the read-through-cache behavior because it probably never makes sense to cache a file that is going to be merged away!

Using separate fleet of merge nodes

I think this is a super interesting idea and is potentially applicable beyond "warm" indexes. I think a lot of the OpenSearch/Lucene architecture is built on having a single writer for a given shard, right? Assuming a shard remains writable during this background merge, then don't we now have another writer for that shard that will have to coordinate with the existing primary? Or do you think the shard goes read-only during the background merge?

anasalkouz commented 1 year ago

Thanks @sachinpkale for the proposal.

Is having many segments still an issues with Concurrent Search? do we have data to validate the need of remote segment merging with concurrent search?

Did you consider introducing a new segment merge implementation optimized for remote store that doesn't require to down segment files locally?

itiyama commented 1 year ago

@sachinpkale Great proposal!

  1. A score is calculated when you select segments for merging. You could try tweaking the score based on whether segments are available in remote store or locally e.g. the block fetch algorithm and merge algorithm should work in tandem to prioritize segment merging of recent segments. Recency is not a dimension in scoring today, but could be a useful heuristic with warm data
  2. You may need to revisit the default_cfs_ratio merge setting for warm indices. This results in creating too many small segment files, especially the metadata files like kdm, fnm etc.

assumption that when index is migrated from hot to warm, its segments were already in optimal state where no further merges are required.

You can achieve this by waiting for all ongoing merges to complete for hot to warm transition when there are not writes. If you do this, as long as there are no writes, this assumption will hold true.

Read segments directly from remote store using RemoteDirectory.

Directly? Where do the blocks that are read available? in heap or offheap?

Assuming a shard remains writable during this background merge, then don't we now have another writer for that shard that will have to coordinate with the existing primary? Or do you think the shard goes read-only during the background merge?

One way of thinking about this could be that the primary is still responsible for metadata writes(read as locks), but it handoff the merge work to the merge node? @sachinpkale @andrross

itiyama commented 1 year ago

Yes, the idea is to use merge nodes to handle all the merge operations.

I don't think this is a good idea, especially if the data is available locally. Small segments merge doesn't consume a lot of resources, so you merge them immediately, instead of moving the data around which in itself would be costly, right? Maybe you need to implement a cost function on whether merging something locally is cheaper than the cost of moving it to a different node, especially with encryption. Background merging may be better for large merges. You could build tiers depending on some segment properties and only offload the merge work depending on tiers. A lot of compaction algorithms for cloud native systems use this strategy.

msfroh commented 1 year ago

If we'll have remote merging nodes available, should they be able to merge shards as well?

You can use many hot shards during ingestion to increase throughput, then merge them into fewer shards before making them available for searching.

@vigyasharma gave a talk about that at ApacheCon NA in 2022: https://www.apachecon.com/acna2022/slides/04_Sharma_Vigya_Decoupling_Indexing_and_Search.pdf